This closes #550

This commit is contained in:
Clebert Suconic 2016-06-01 17:30:51 -04:00
commit e64ea5278f
24 changed files with 1113 additions and 298 deletions

View File

@ -53,12 +53,19 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Database driver support -->
<dependency> <dependency>
<groupId>org.apache.derby</groupId> <groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId> <artifactId>derby</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId> <artifactId>artemis-journal</artifactId>

View File

@ -23,9 +23,13 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
public class JDBCUtils { public class JDBCUtils {
@ -69,8 +73,45 @@ public class JDBCUtils {
if (driverClass.contains("derby")) { if (driverClass.contains("derby")) {
return new DerbySQLProvider(tableName); return new DerbySQLProvider(tableName);
} }
else if (driverClass.contains("postgres")) {
return new PostgresSQLProvider(tableName);
}
else if (driverClass.contains("mysql")) {
return new MySQLSQLProvider(tableName);
}
else { else {
return new GenericSQLProvider(tableName); return new GenericSQLProvider(tableName);
} }
} }
public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
String tableName,
String jdbcConnectionUrl) throws SQLException {
JDBCSequentialFileFactoryDriver dbDriver;
if (driverClass.contains("derby")) {
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else if (driverClass.contains("postgres")) {
dbDriver = new PostgresSequentialSequentialFileDriver();
dbDriver.setSqlProvider(new PostgresSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else if (driverClass.contains("mysql")) {
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new MySQLSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
else {
dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
dbDriver.setJdbcDriverClass(driverClass);
}
return dbDriver;
}
} }

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
/**
* Class to hold common database functionality such as drivers and connections
*/
public abstract class AbstractJDBCDriver {
protected Connection connection;
protected SQLProvider sqlProvider;
protected String jdbcConnectionUrl;
protected String jdbcDriverClass;
protected Driver dbDriver;
public AbstractJDBCDriver() {
}
public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
this.jdbcDriverClass = jdbcDriverClass;
this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName);
}
public void start() throws Exception {
connect();
createSchema();
prepareStatements();
}
public void stop() throws SQLException {
if (sqlProvider.closeConnectionOnShutdown()) {
connection.close();
}
}
protected abstract void prepareStatements() throws SQLException;
protected abstract void createSchema() throws SQLException;
protected void createTable(String schemaSql) throws SQLException {
JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql);
}
protected void connect() throws Exception {
try {
dbDriver = JDBCUtils.getDriver(jdbcDriverClass);
connection = dbDriver.connect(jdbcConnectionUrl, new Properties());
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl);
throw new RuntimeException("Error connecting to database", e);
}
}
public void destroy() throws Exception {
try {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
statement.close();
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public SQLProvider getSqlProvider() {
return sqlProvider;
}
public void setSqlProvider(SQLProvider sqlProvider) {
this.sqlProvider = sqlProvider;
}
public String getJdbcConnectionUrl() {
return jdbcConnectionUrl;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getJdbcDriverClass() {
return jdbcDriverClass;
}
public void setJdbcDriverClass(String jdbcDriverClass) {
this.jdbcDriverClass = jdbcDriverClass;
}
}

View File

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.jdbc.store.file.sql; package org.apache.activemq.artemis.jdbc.store.drivers.derby;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
public class DerbySQLProvider extends GenericSQLProvider { public class DerbySQLProvider extends GenericSQLProvider {
@ -46,7 +48,12 @@ public class DerbySQLProvider extends GenericSQLProvider {
} }
@Override @Override
public String getAppendToFileSQL() { public String getAppendToLargeObjectSQL() {
return appendToFileSQL; return appendToFileSQL;
} }
@Override
public boolean closeConnectionOnShutdown() {
return false;
}
} }

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers.mysql;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
public class MySQLSQLProvider extends GenericSQLProvider {
private static final int MAX_BLOB_SIZE = 4 * 1024 * 1024 * 1024; // 4GB
private final String createFileTableSQL;
private final String createJournalTableSQL;
private final String copyFileRecordByIdSQL;
public MySQLSQLProvider(String tName) {
super(tName.toLowerCase());
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INTEGER NOT NULL AUTO_INCREMENT," +
"FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA LONGBLOB, PRIMARY KEY(ID)) ENGINE=InnoDB;";
createJournalTableSQL = "CREATE TABLE " + tableName +
"(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record LONGBLOB,txDataSize INTEGER,txData LONGBLOB,txCheckNoRecords INTEGER,seq BIGINT) ENGINE=InnoDB;";
copyFileRecordByIdSQL = " UPDATE " + tableName + ", (SELECT DATA AS FROM_DATA FROM " + tableName +
" WHERE id=?) SELECT_COPY SET DATA=FROM_DATA WHERE id=?;";
}
@Override
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
@Override
public String getCreateFileTableSQL() {
return createFileTableSQL;
}
@Override
public String getCreateJournalTableSQL() {
return createJournalTableSQL;
}
@Override
public String getCopyFileRecordByIdSQL() {
return copyFileRecordByIdSQL;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
public class PostgresSQLProvider extends GenericSQLProvider {
// BYTEA Size used in Journal
private static final int MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB
private final String createFileTableSQL;
private final String createJournalTableSQL;
public PostgresSQLProvider(String tName) {
super(tName.toLowerCase());
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)";
}
@Override
public String getCreateFileTableSQL() {
return createFileTableSQL;
}
@Override
public String getCreateJournalTableSQL() {
return createJournalTableSQL;
}
@Override
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.postgresql.PGConnection;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
public PostgresSequentialSequentialFileDriver() throws SQLException {
super();
}
@Override
public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
try {
connection.setAutoCommit(false);
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
long oid = lobjManager.createLO();
createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
createFile.setLong(3, oid);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
file.setId(keys.getInt(1));
}
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
@Override
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.setWritePosition(getPostGresLargeObjectSize(file));
}
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
@Override
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
Long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
largeObject.seek(largeObject.size());
largeObject.write(data);
largeObject.close();
connection.commit();
}
catch (Exception e) {
connection.rollback();
throw e;
}
return data.length;
}
@Override
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
LargeObject largeObject = null;
long oid = getOID(file);
try {
connection.setAutoCommit(false);
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
if (readLength > 0) {
if (file.position() > 0) largeObject.seek((int) file.position());
byte[] data = largeObject.read(readLength);
bytes.put(data);
}
largeObject.close();
connection.commit();
return readLength;
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
if (oid == null) {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
}
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
System.out.println("FD");
}
return (Long) file.getMetaData(POSTGRES_OID_KEY);
}
private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
int size = 0;
Long oid = getOID(file);
if (oid != null) {
try {
connection.setAutoCommit(false);
LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
size = largeObject.size();
largeObject.close();
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
return size;
}
}

View File

@ -19,24 +19,20 @@ package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public class JDBCSequentialFile implements SequentialFile { public class JDBCSequentialFile implements SequentialFile {
@ -53,20 +49,6 @@ public class JDBCSequentialFile implements SequentialFile {
private int id = -1; private int id = -1;
private final PreparedStatement appendToFile;
private final PreparedStatement deleteFile;
private final PreparedStatement readFile;
private final PreparedStatement createFile;
private final PreparedStatement selectFileByFileName;
private final PreparedStatement copyFileRecord;
private final PreparedStatement renameFile;
private long readPosition = 0; private long readPosition = 0;
private long writePosition = 0; private long writePosition = 0;
@ -75,33 +57,28 @@ public class JDBCSequentialFile implements SequentialFile {
private JDBCSequentialFileFactory fileFactory; private JDBCSequentialFileFactory fileFactory;
private int maxSize;
private SQLProvider sqlProvider;
private final Object writeLock; private final Object writeLock;
private final JDBCSequentialFileFactoryDriver dbDriver;
// Allows DB Drivers to cache meta data.
private Map<Object, Object> metaData = new ConcurrentHashMap<>();
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory, public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename, final String filename,
final SQLProvider sqlProvider,
final Executor executor, final Executor executor,
final JDBCSequentialFileFactoryDriver driver,
final Object writeLock) throws SQLException { final Object writeLock) throws SQLException {
this.fileFactory = fileFactory; this.fileFactory = fileFactory;
this.filename = filename; this.filename = filename;
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : ""; this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
this.executor = executor; this.executor = executor;
this.maxSize = sqlProvider.getMaxBlobSize();
this.sqlProvider = sqlProvider;
this.writeLock = writeLock; this.writeLock = writeLock;
this.dbDriver = driver;
}
Connection connection = fileFactory.getConnection(); public void setWritePosition(int writePosition) {
this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL()); this.writePosition = writePosition;
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL());
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
} }
@Override @Override
@ -117,37 +94,13 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public synchronized void open() throws Exception { public synchronized void open() throws Exception {
if (!isOpen) { if (!isOpen) {
try {
synchronized (writeLock) { synchronized (writeLock) {
selectFileByFileName.setString(1, filename); dbDriver.openFile(this);
try (ResultSet rs = selectFileByFileName.executeQuery()) {
if (!rs.next()) {
createFile.setString(1, filename);
createFile.setString(2, extension);
createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
this.id = keys.getInt(1);
}
}
else {
this.id = rs.getInt(1);
this.writePosition = rs.getBlob(4).length();
}
}
}
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e);
isOpen = false;
}
isCreated = true; isCreated = true;
isOpen = true; isOpen = true;
} }
} }
}
@Override @Override
public void open(int maxIO, boolean useExecutor) throws Exception { public void open(int maxIO, boolean useExecutor) throws Exception {
@ -156,7 +109,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public boolean fits(int size) { public boolean fits(int size) {
return writePosition + size <= maxSize; return writePosition + size <= dbDriver.getMaxSize();
} }
@Override @Override
@ -183,24 +136,20 @@ public class JDBCSequentialFile implements SequentialFile {
public void delete() throws IOException, InterruptedException, ActiveMQException { public void delete() throws IOException, InterruptedException, ActiveMQException {
try { try {
if (isCreated) { if (isCreated) {
deleteFile.setInt(1, id); synchronized (writeLock) {
deleteFile.executeUpdate(); dbDriver.deleteFile(this);
}
} }
} }
catch (SQLException e) { catch (SQLException e) {
throw new IOException(e); throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e);
} }
} }
private synchronized int internalWrite(byte[] data, IOCallback callback) { private synchronized int internalWrite(byte[] data, IOCallback callback) {
try { try {
synchronized (writeLock) { synchronized (writeLock) {
int noBytes = data.length; int noBytes = dbDriver.writeToFile(this, data);
appendToFile.setBytes(1, data);
appendToFile.setInt(2, id);
int result = appendToFile.executeUpdate();
if (result < 1)
throw new ActiveMQException("No record found for file id: " + id);
seek(noBytes); seek(noBytes);
if (callback != null) if (callback != null)
callback.done(); callback.done();
@ -295,36 +244,19 @@ public class JDBCSequentialFile implements SequentialFile {
} }
@Override @Override
public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException { public synchronized int read(ByteBuffer bytes, final IOCallback callback) throws SQLException {
synchronized (writeLock) { synchronized (writeLock) {
readFile.setInt(1, id); try {
try (ResultSet rs = readFile.executeQuery()) { int read = dbDriver.readFromFile(this, bytes);
if (rs.next()) { readPosition += read;
Blob blob = rs.getBlob(1);
long bytesRemaining = blob.length() - readPosition;
byte[] data;
if (bytesRemaining > bytes.remaining()) {
// First index into blob is 1 (not 0)
data = blob.getBytes(readPosition + 1, bytes.remaining());
}
else {
// First index into blob is 1 (not 0)
data = blob.getBytes(readPosition + 1, (int) bytesRemaining);
}
bytes.put(data);
readPosition += data.length;
if (callback != null) if (callback != null)
callback.done(); callback.done();
return read;
return data.length;
}
return 0;
} }
catch (Exception e) { catch (Exception e) {
if (callback != null) if (callback != null)
callback.onError(-1, e.getMessage()); callback.onError(-1, e.getMessage());
e.printStackTrace();
return 0; return 0;
} }
} }
@ -352,8 +284,20 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public void sync() throws IOException { public void sync() throws IOException {
// (mtaylor) We always write straight away, so we don't need to do anything here. final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
// (mtaylor) Is this meant to be blocking? executor.execute(new Runnable() {
@Override
public void run() {
callback.done();
}
});
try {
callback.waitCompletion();
}
catch (Exception e) {
throw new IOException(e);
}
} }
@Override @Override
@ -363,15 +307,15 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public void renameTo(String newFileName) throws Exception { public void renameTo(String newFileName) throws Exception {
renameFile.setString(1, newFileName); synchronized (writeLock) {
renameFile.setInt(2, id); dbDriver.renameFile(this, newFileName);
renameFile.executeUpdate(); }
} }
@Override @Override
public SequentialFile cloneFile() { public SequentialFile cloneFile() {
try { try {
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock); JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock);
return clone; return clone;
} }
catch (Exception e) { catch (Exception e) {
@ -385,9 +329,9 @@ public class JDBCSequentialFile implements SequentialFile {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
clone.open(); clone.open();
copyFileRecord.setInt(1, id); synchronized (writeLock) {
copyFileRecord.setInt(2, clone.getId()); dbDriver.copyFileData(this, clone);
copyFileRecord.executeUpdate(); }
} }
public int getId() { public int getId() {
@ -416,4 +360,16 @@ public class JDBCSequentialFile implements SequentialFile {
public File getJavaFile() { public File getJavaFile() {
return null; return null;
} }
public void addMetaData(Object key, Object value) {
metaData.put(key, value);
}
public Object removeMetaData(Object key) {
return metaData.remove(key);
}
public Object getMetaData(Object key) {
return metaData.get(key);
}
} }

View File

@ -18,17 +18,11 @@ package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File; import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
@ -36,53 +30,59 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
private Connection connection;
private String connectionUrl;
private final Driver driver;
private boolean started; private boolean started;
private final String tableName;
private List<JDBCSequentialFile> files; private List<JDBCSequentialFile> files;
private PreparedStatement selectFileNamesByExtension;
private Executor executor; private Executor executor;
private SQLProvider sqlProvider;
private Map<String, Object> fileLocks = new HashMap<>(); private Map<String, Object> fileLocks = new HashMap<>();
private final JDBCSequentialFileFactoryDriver dbDriver;
public JDBCSequentialFileFactory(final String connectionUrl, public JDBCSequentialFileFactory(final String connectionUrl,
final String tableName, final String tableName,
final String className, final String className,
Executor executor) throws Exception { Executor executor) throws Exception {
this.connectionUrl = connectionUrl;
this.executor = executor; this.executor = executor;
this.tableName = tableName.toUpperCase();
files = new ArrayList<>(); files = new ArrayList<>();
sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName); dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl);
driver = JDBCUtils.getDriver(className);
} }
public Connection getConnection() { @Override
return connection; public synchronized void start() {
try {
if (!started) {
dbDriver.start();
started = true;
}
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
started = false;
}
}
@Override
public synchronized void stop() {
try {
dbDriver.stop();
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
}
started = false;
} }
@Override @Override
public SequentialFile createSequentialFile(String fileName) { public SequentialFile createSequentialFile(String fileName) {
try { try {
fileLocks.putIfAbsent(fileName, new Object()); fileLocks.putIfAbsent(fileName, new Object());
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName)); JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, dbDriver, fileLocks.get(fileName));
files.add(file); files.add(file);
return file; return file;
} }
@ -99,15 +99,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public List<String> listFiles(String extension) throws Exception { public List<String> listFiles(String extension) throws Exception {
List<String> fileNames = new ArrayList<>(); return dbDriver.listFiles(extension);
selectFileNamesByExtension.setString(1, extension);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
while (rs.next()) {
fileNames.add(rs.getString(1));
}
}
return fileNames;
} }
@Override @Override
@ -171,7 +163,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public void activateBuffer(SequentialFile file) { public void activateBuffer(SequentialFile file) {
} }
@Override @Override
@ -179,34 +170,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
return null; return null;
} }
@Override
public synchronized void start() {
try {
if (!started) {
connection = driver.connect(connectionUrl, new Properties());
JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL());
selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
started = true;
}
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
started = false;
}
}
@Override
public synchronized void stop() {
try {
if (false)
connection.close();
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
}
started = false;
}
@Override @Override
public boolean isStarted() { public boolean isStarted() {
return started; return started;
@ -218,12 +181,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
@Override @Override
public void flush() { public void flush() {
} }
public synchronized void destroy() throws SQLException { public synchronized void destroy() throws SQLException {
Statement statement = connection.createStatement(); dbDriver.destroy();
statement.executeUpdate(sqlProvider.getDropFileTableSQL());
stop();
} }
} }

View File

@ -0,0 +1,323 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
protected PreparedStatement deleteFile;
protected PreparedStatement createFile;
protected PreparedStatement selectFileByFileName;
protected PreparedStatement copyFileRecord;
protected PreparedStatement renameFile;
protected PreparedStatement readLargeObject;
protected PreparedStatement appendToLargeObject;
protected PreparedStatement selectFileNamesByExtension;
public JDBCSequentialFileFactoryDriver() {
super();
}
public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
super(tableName, jdbcConnectionUrl, jdbcDriverClass);
}
public void start() throws Exception {
super.start();
}
@Override
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateFileTableSQL());
}
@Override
protected void prepareStatements() throws SQLException {
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
}
public synchronized List<String> listFiles(String extension) throws Exception {
List<String> fileNames = new ArrayList<>();
try {
connection.setAutoCommit(false);
selectFileNamesByExtension.setString(1, extension);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
while (rs.next()) {
fileNames.add(rs.getString(1));
}
}
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
return fileNames;
}
/**
* Opens the supplied file. If the file does not exist in the database it will create a new one.
*
* @param file
* @return
* @throws SQLException
*/
public void openFile(JDBCSequentialFile file) throws SQLException {
int fileId = fileExists(file);
if (fileId < 0) {
createFile(file);
}
else {
file.setId(fileId);
loadFile(file);
}
}
/**
* Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1.
*
* @param file
* @return
* @throws SQLException
*/
public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
connection.setAutoCommit(false);
selectFileByFileName.setString(1, file.getFileName());
try (ResultSet rs = selectFileByFileName.executeQuery()) {
int id = rs.next() ? rs.getInt(1) : -1;
connection.commit();
return id;
}
catch (Exception e) {
connection.rollback();
throw e;
}
}
/**
* Loads an existing file.
*
* @param file
* @throws SQLException
*/
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
file.setWritePosition((int) rs.getBlob(1).length());
}
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
/**
* Creates a new database row representing the supplied file.
*
* @param file
* @throws SQLException
*/
public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
try {
connection.setAutoCommit(false);
createFile.setString(1, file.getFileName());
createFile.setString(2, file.getExtension());
createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
file.setId(keys.getInt(1));
}
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
/**
* Updates the fileName field to the new value.
*
* @param file
* @param newFileName
* @throws SQLException
*/
public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException {
try {
connection.setAutoCommit(false);
renameFile.setString(1, newFileName);
renameFile.setInt(2, file.getId());
renameFile.executeUpdate();
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
/**
* Deletes the associated row in the database.
*
* @param file
* @throws SQLException
*/
public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
try {
connection.setAutoCommit(false);
deleteFile.setInt(1, file.getId());
deleteFile.executeUpdate();
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
/**
* Persists data to this files associated database mapping.
*
* @param file
* @param data
* @return
* @throws Exception
*/
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
try {
connection.setAutoCommit(false);
appendToLargeObject.setBytes(1, data);
appendToLargeObject.setInt(2, file.getId());
appendToLargeObject.executeUpdate();
connection.commit();
return data.length;
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
/**
* Reads data from the file (at file.readPosition) into the byteBuffer.
*
* @param file
* @param bytes
* @return
* @throws Exception
*/
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
connection.setAutoCommit(false);
readLargeObject.setInt(1, file.getId());
int readLength = 0;
try (ResultSet rs = readLargeObject.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
bytes.put(data);
}
connection.commit();
return readLength;
}
catch (Throwable e) {
connection.rollback();
throw e;
}
}
/**
* Copy the data content of FileFrom to FileTo
*
* @param fileFrom
* @param fileTo
* @throws SQLException
*/
public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException {
try {
connection.setAutoCommit(false);
copyFileRecord.setInt(1, fileFrom.getId());
copyFileRecord.setInt(2, fileTo.getId());
copyFileRecord.executeUpdate();
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
/**
* Drop all tables and data
*/
public synchronized void destroy() throws SQLException {
try {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.executeUpdate(sqlProvider.getDropFileTableSQL());
connection.commit();
}
catch (SQLException e) {
connection.rollback();
throw e;
}
}
public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) {
long bytesRemaining = objectLength - readPosition;
if (bytesRemaining > bufferSpace) {
return bufferSpace;
}
else {
return bytesRemaining;
}
}
public int getMaxSize() {
return sqlProvider.getMaxBlobSize();
}
}

View File

@ -17,21 +17,15 @@
package org.apache.activemq.artemis.jdbc.store.journal; package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Timer; import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -44,23 +38,18 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public class JDBCJournalImpl implements Journal { public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
// Sync Delay in ms // Sync Delay in ms
public static final int SYNC_DELAY = 5; public static final int SYNC_DELAY = 5;
private static int USER_VERSION = 1; private static int USER_VERSION = 1;
private final String tableName; private final List<JDBCJournalRecord> records;
private final String jdbcDriverClass;
private Connection connection;
private List<JDBCJournalRecord> records;
private PreparedStatement insertJournalRecords; private PreparedStatement insertJournalRecords;
@ -74,13 +63,9 @@ public class JDBCJournalImpl implements Journal {
private boolean started; private boolean started;
private String jdbcUrl;
private Timer syncTimer; private Timer syncTimer;
private Driver dbDriver; private final Object journalLock = new Object();
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private final String timerThread; private final String timerThread;
@ -90,68 +75,49 @@ public class JDBCJournalImpl implements Journal {
// Sequence ID for journal records // Sequence ID for journal records
private AtomicLong seq = new AtomicLong(0); private AtomicLong seq = new AtomicLong(0);
public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) { private Logger logger = Logger.getLogger(this.getClass());
this.tableName = tableName;
this.jdbcUrl = jdbcUrl;
this.jdbcDriverClass = jdbcDriverClass;
timerThread = "Timer JDBC Journal(" + tableName + ")";
public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) {
super(tableName, jdbcUrl, jdbcDriverClass);
timerThread = "Timer JDBC Journal(" + tableName + ")";
records = new ArrayList<>(); records = new ArrayList<>();
} }
@Override @Override
public void start() throws Exception { public void start() throws Exception {
dbDriver = JDBCUtils.getDriver(jdbcDriverClass); super.start();
try {
connection = dbDriver.connect(jdbcUrl, new Properties());
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcUrl);
throw new RuntimeException("Error connecting to database", e);
}
JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName));
insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName));
syncTimer = new Timer(timerThread, true); syncTimer = new Timer(timerThread, true);
syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY); syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
started = true; started = true;
} }
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateJournalTableSQL());
}
@Override @Override
public void stop() throws Exception { protected void prepareStatements() throws SQLException {
stop(true); insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL());
deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL());
deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL());
} }
public synchronized void stop(boolean shutdownConnection) throws Exception { @Override
public synchronized void stop() throws SQLException {
if (started) { if (started) {
journalLock.writeLock().lock(); synchronized (journalLock) {
syncTimer.cancel(); syncTimer.cancel();
sync(); sync();
if (shutdownConnection) {
connection.close();
}
started = false; started = false;
journalLock.writeLock().unlock(); super.stop();
}
} }
} }
public synchronized void destroy() throws Exception { public synchronized void destroy() throws Exception {
connection.setAutoCommit(false); super.destroy();
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE " + tableName);
statement.close();
connection.commit();
stop(); stop();
} }
@ -159,8 +125,11 @@ public class JDBCJournalImpl implements Journal {
if (!started) if (!started)
return 0; return 0;
List<JDBCJournalRecord> recordRef = records; List<JDBCJournalRecord> recordRef = new ArrayList<>();
records = new ArrayList<JDBCJournalRecord>(); synchronized (records) {
recordRef.addAll(records);
records.clear();
}
// We keep a list of deleted records and committed tx (used for cleaning up old transaction data). // We keep a list of deleted records and committed tx (used for cleaning up old transaction data).
List<Long> deletedRecords = new ArrayList<>(); List<Long> deletedRecords = new ArrayList<>();
@ -215,12 +184,18 @@ public class JDBCJournalImpl implements Journal {
deleteJournalTxRecords.executeBatch(); deleteJournalTxRecords.executeBatch();
connection.commit(); connection.commit();
cleanupTxRecords(deletedRecords, committedTransactions);
success = true; success = true;
} }
catch (SQLException e) { catch (SQLException e) {
performRollback(connection, recordRef); performRollback(recordRef);
}
try {
if (success)
cleanupTxRecords(deletedRecords, committedTransactions);
}
catch (SQLException e) {
e.printStackTrace();
} }
executeCallbacks(recordRef, success); executeCallbacks(recordRef, success);
@ -230,12 +205,11 @@ public class JDBCJournalImpl implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */ we remove the Tx Records (i.e. PREPARE, COMMIT). */
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException { private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
connection.rollback();
List<RecordInfo> iterableCopy; List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>(); List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values()); iterableCopyTx.addAll(transactions.values());
for (Long txId : committedTx) { for (Long txId : committedTx) {
transactions.get(txId).committed = true; transactions.get(txId).committed = true;
} }
@ -260,9 +234,8 @@ public class JDBCJournalImpl implements Journal {
} }
} }
private void performRollback(Connection connection, List<JDBCJournalRecord> records) { private void performRollback(List<JDBCJournalRecord> records) {
try { try {
connection.rollback();
for (JDBCJournalRecord record : records) { for (JDBCJournalRecord record : records) {
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
removeTxRecord(record); removeTxRecord(record);
@ -306,18 +279,18 @@ public class JDBCJournalImpl implements Journal {
record.setIoCompletion(callback); record.setIoCompletion(callback);
} }
try { synchronized (journalLock) {
journalLock.writeLock().lock();
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
addTxRecord(record); addTxRecord(record);
} }
synchronized (records) {
records.add(record); records.add(record);
} }
finally {
journalLock.writeLock().unlock();
} }
if (callback != null) callback.waitCompletion(); if (callback != null)
callback.waitCompletion();
} }
private synchronized void addTxRecord(JDBCJournalRecord record) { private synchronized void addTxRecord(JDBCJournalRecord record) {
@ -703,12 +676,12 @@ public class JDBCJournalImpl implements Journal {
@Override @Override
public final void synchronizationLock() { public final void synchronizationLock() {
journalLock.writeLock().lock(); logger.error("Replication is not supported with JDBC Store");
} }
@Override @Override
public final void synchronizationUnlock() { public final void synchronizationUnlock() {
journalLock.writeLock().unlock(); logger.error("Replication is not supported with JDBC Store");
} }
@Override @Override

View File

@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.jdbc.store.file.sql; package org.apache.activemq.artemis.jdbc.store.sql;
public class GenericSQLProvider implements SQLProvider { public class GenericSQLProvider implements SQLProvider {
// Default to lowest (MYSQL = 64k) // Default to lowest (MYSQL = 64k)
private static final int MAX_BLOB_SIZE = 64512; private static final int MAX_BLOB_SIZE = 64512;
private final String tableName; protected final String tableName;
private final String createFileTableSQL; private final String createFileTableSQL;
@ -33,7 +33,7 @@ public class GenericSQLProvider implements SQLProvider {
private final String appendToFileSQL; private final String appendToFileSQL;
private final String readFileSQL; private final String readLargeObjectSQL;
private final String deleteFileSQL; private final String deleteFileSQL;
@ -45,14 +45,25 @@ public class GenericSQLProvider implements SQLProvider {
private final String dropFileTableSQL; private final String dropFileTableSQL;
private final String createJournalTableSQL;
private final String insertJournalRecordsSQL;
private final String selectJournalRecordsSQL;
private final String deleteJournalRecordsSQL;
private final String deleteJournalTxRecordsSQL;
private final String countJournalRecordsSQL;
public GenericSQLProvider(String tableName) { public GenericSQLProvider(String tableName) {
this.tableName = tableName; this.tableName = tableName;
createFileTableSQL = "CREATE TABLE " + tableName + createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
insertFileSQL = "INSERT INTO " + tableName + insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
" (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?"; selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
@ -60,7 +71,7 @@ public class GenericSQLProvider implements SQLProvider {
appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?"; appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?"; readLargeObjectSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?"; deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
@ -72,6 +83,18 @@ public class GenericSQLProvider implements SQLProvider {
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?"; copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
dropFileTableSQL = "DROP TABLE " + tableName; dropFileTableSQL = "DROP TABLE " + tableName;
createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
selectJournalRecordsSQL = "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC";
deleteJournalRecordsSQL = "DELETE FROM " + tableName + " WHERE id = ?";
deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
} }
@Override @Override
@ -84,6 +107,38 @@ public class GenericSQLProvider implements SQLProvider {
return tableName; return tableName;
} }
// Journal SQL Statements
@Override
public String getCreateJournalTableSQL() {
return createJournalTableSQL;
}
@Override
public String getInsertJournalRecordsSQL() {
return insertJournalRecordsSQL;
}
@Override
public String getSelectJournalRecordsSQL() {
return selectJournalRecordsSQL;
}
@Override
public String getDeleteJournalRecordsSQL() {
return deleteJournalRecordsSQL;
}
@Override
public String getDeleteJournalTxRecordsSQL() {
return deleteJournalTxRecordsSQL;
}
@Override
public String getCountJournalRecordsSQL() {
return countJournalRecordsSQL;
}
// Large Message Statements
@Override @Override
public String getCreateFileTableSQL() { public String getCreateFileTableSQL() {
return createFileTableSQL; return createFileTableSQL;
@ -105,13 +160,13 @@ public class GenericSQLProvider implements SQLProvider {
} }
@Override @Override
public String getAppendToFileSQL() { public String getAppendToLargeObjectSQL() {
return appendToFileSQL; return appendToFileSQL;
} }
@Override @Override
public String getReadFileSQL() { public String getReadLargeObjectSQL() {
return readFileSQL; return readLargeObjectSQL;
} }
@Override @Override
@ -139,5 +194,8 @@ public class GenericSQLProvider implements SQLProvider {
return dropFileTableSQL; return dropFileTableSQL;
} }
@Override
public boolean closeConnectionOnShutdown() {
return true;
}
} }

View File

@ -14,12 +14,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.jdbc.store.file.sql; package org.apache.activemq.artemis.jdbc.store.sql;
public interface SQLProvider { public interface SQLProvider {
int getMaxBlobSize(); int getMaxBlobSize();
String getCreateJournalTableSQL();
String getInsertJournalRecordsSQL();
String getSelectJournalRecordsSQL();
String getDeleteJournalRecordsSQL();
String getDeleteJournalTxRecordsSQL();
String getTableName(); String getTableName();
String getCreateFileTableSQL(); String getCreateFileTableSQL();
@ -30,9 +40,9 @@ public interface SQLProvider {
String getSelectFileByFileName(); String getSelectFileByFileName();
String getAppendToFileSQL(); String getAppendToLargeObjectSQL();
String getReadFileSQL(); String getReadLargeObjectSQL();
String getDeleteFileSQL(); String getDeleteFileSQL();
@ -43,4 +53,8 @@ public interface SQLProvider {
String getDropFileTableSQL(); String getDropFileTableSQL();
String getCloneFileRecordByIdSQL(); String getCloneFileRecordByIdSQL();
String getCountJournalRecordsSQL();
boolean closeConnectionOnShutdown();
} }

View File

@ -63,7 +63,7 @@ public class JDBCSequentialFileFactoryTest {
} }
@After @After
public void tearDown() throws SQLException { public void tearDown() throws Exception {
factory.destroy(); factory.destroy();
} }
@ -126,8 +126,8 @@ public class JDBCSequentialFileFactoryTest {
JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt"); JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt");
file.copyTo(copy); file.copyTo(copy);
checkData(copy, src);
checkData(file, src); checkData(file, src);
checkData(copy, src);
} }
@Test @Test
@ -145,7 +145,12 @@ public class JDBCSequentialFileFactoryTest {
IOCallbackCountdown callback = new IOCallbackCountdown(1); IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback); file.internalWrite(src, callback);
assertEquals(bufferSize, file.size());
JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile(); JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile();
copy.open();
assertEquals(bufferSize, copy.size());
assertEquals(bufferSize, file.size());
} }
private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException { private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException {

View File

@ -88,10 +88,8 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
beforeStop(); beforeStop();
((JDBCJournalImpl) bindingsJournal).stop(false); bindingsJournal.stop();
messageJournal.stop();
((JDBCJournalImpl) messageJournal).stop(false);
largeMessagesFactory.stop(); largeMessagesFactory.stop();
singleThreadExecutor.shutdown(); singleThreadExecutor.shutdown();

View File

@ -40,6 +40,10 @@ import java.lang.management.ManagementFactory;
import java.lang.ref.Reference; import java.lang.ref.Reference;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -124,6 +128,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
@ -448,12 +454,34 @@ public abstract class ActiveMQTestBase extends Assert {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS"); dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGES"); dbStorageConfiguration.setMessageTableName("MESSAGE");
dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver"); dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE");
dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName());
configuration.setStoreConfiguration(dbStorageConfiguration); configuration.setStoreConfiguration(dbStorageConfiguration);
} }
public void destroyTables(List<String> tableNames) throws Exception {
Driver driver = JDBCUtils.getDriver(getJDBCClassName());
Connection connection = driver.connect(getTestJDBCConnectionUrl(), null);
Statement statement = connection.createStatement();
try {
for (String tableName : tableNames) {
SQLProvider sqlProvider = JDBCUtils.getSQLProvider(getJDBCClassName(), tableName);
ResultSet rs = connection.getMetaData().getTables(null, null, sqlProvider.getTableName(), null);
if (rs.next()) {
statement.execute("DROP TABLE " + sqlProvider.getTableName());
}
}
}
catch (Throwable e) {
e.printStackTrace();
}
finally {
connection.close();
}
}
protected Map<String, Object> generateInVMParams(final int node) { protected Map<String, Object> generateInVMParams(final int node) {
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
@ -797,7 +825,11 @@ public abstract class ActiveMQTestBase extends Assert {
} }
protected final String getTestJDBCConnectionUrl() { protected final String getTestJDBCConnectionUrl() {
return "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true"; return System.getProperty("jdbc.connection.url", "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true");
}
protected final String getJDBCClassName() {
return System.getProperty("jdbc.driver.class","org.apache.derby.jdbc.EmbeddedDriver");
} }
protected final File getTestDirfile() { protected final File getTestDirfile() {

View File

@ -185,6 +185,10 @@ public class ThreadLeakCheckRule extends ExternalResource {
// The derby engine is initialized once, and lasts the lifetime of the VM // The derby engine is initialized once, and lasts the lifetime of the VM
return true; return true;
} }
else if (threadName.contains("Abandoned connection cleanup thread")) {
// MySQL Engine checks for abandoned connections
return true;
}
else if (threadName.contains("Timer")) { else if (threadName.contains("Timer")) {
// The timer threads in Derby and JDBC use daemon and shutdown once user threads exit. // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit.
return true; return true;

View File

@ -229,6 +229,14 @@
<!-- Eclipse Public License - v 1.0 --> <!-- Eclipse Public License - v 1.0 -->
</dependency> </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4-1205-jdbc4</version>
<scope>provided</scope>
<!-- postgresql license -->
</dependency>
<dependency> <dependency>
<groupId>commons-collections</groupId> <groupId>commons-collections</groupId>
<artifactId>commons-collections-testframework</artifactId> <artifactId>commons-collections-testframework</artifactId>

View File

@ -240,11 +240,20 @@
<artifactId>jboss-javaee</artifactId> <artifactId>jboss-javaee</artifactId>
<version>5.0.0.GA</version> <version>5.0.0.GA</version>
</dependency> </dependency>
<!-- DB Test Deps -->
<dependency> <dependency>
<groupId>org.apache.derby</groupId> <groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId> <artifactId>derby</artifactId>
<version>${apache.derby.version}</version> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
<!--Vertx provided dependencies--> <!--Vertx provided dependencies-->
<dependency> <dependency>
<groupId>io.vertx</groupId> <groupId>io.vertx</groupId>

View File

@ -79,6 +79,13 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
this.storeType = storeType; this.storeType = storeType;
} }
public void tearDown() throws Exception {
super.tearDown();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE"));
}
}
@Parameterized.Parameters(name = "storeType={0}") @Parameterized.Parameters(name = "storeType={0}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};

View File

@ -40,7 +40,6 @@ public class RolesConfigurationStorageTest extends StorageManagerTestBase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
mapExpectedSets = new HashMap<>(); mapExpectedSets = new HashMap<>();
} }

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.activemq.artemis.tests.integration.persistence; package org.apache.activemq.artemis.tests.integration.persistence;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -105,15 +103,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal = null; jmsJournal = null;
} }
// Stops the database engine early to stop thread leaks showing. destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"}));
if (storeType == StoreConfiguration.StoreType.DATABASE) {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (SQLException e) {
}
}
super.tearDown(); super.tearDown();
if (exception != null) if (exception != null)
throw exception; throw exception;

View File

@ -125,6 +125,9 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
public void tearDown() throws Exception { public void tearDown() throws Exception {
MBeanServerFactory.releaseMBeanServer(mbeanServer); MBeanServerFactory.releaseMBeanServer(mbeanServer);
super.tearDown(); super.tearDown();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE"));
}
} }
@Test @Test

View File

@ -111,6 +111,12 @@
<groupId>org.apache.geronimo.specs</groupId> <groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_2.0_spec</artifactId> <artifactId>geronimo-jms_2.0_spec</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${apache.derby.version}</version>
<scope>test</scope>
</dependency>
<!-- this is for the log assertion --> <!-- this is for the log assertion -->
<dependency> <dependency>