diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml index 86fe1a6e5b..80f69c4270 100644 --- a/artemis-jdbc-store/pom.xml +++ b/artemis-jdbc-store/pom.xml @@ -53,12 +53,19 @@ test + org.apache.derby derby test + + org.postgresql + postgresql + provided + + org.apache.activemq artemis-journal diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java index bc04ab9316..8ce08c64e3 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java @@ -23,9 +23,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider; -import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider; -import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; +import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; +import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider; +import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider; +import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver; +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; public class JDBCUtils { @@ -69,8 +73,45 @@ public class JDBCUtils { if (driverClass.contains("derby")) { return new DerbySQLProvider(tableName); } + else if (driverClass.contains("postgres")) { + return new PostgresSQLProvider(tableName); + } + else if (driverClass.contains("mysql")) { + return new MySQLSQLProvider(tableName); + } else { return new GenericSQLProvider(tableName); } } + + public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass, + String tableName, + String jdbcConnectionUrl) throws SQLException { + JDBCSequentialFileFactoryDriver dbDriver; + if (driverClass.contains("derby")) { + dbDriver = new JDBCSequentialFileFactoryDriver(); + dbDriver.setSqlProvider(new DerbySQLProvider(tableName)); + dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); + dbDriver.setJdbcDriverClass(driverClass); + } + else if (driverClass.contains("postgres")) { + dbDriver = new PostgresSequentialSequentialFileDriver(); + dbDriver.setSqlProvider(new PostgresSQLProvider(tableName)); + dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); + dbDriver.setJdbcDriverClass(driverClass); + } + else if (driverClass.contains("mysql")) { + dbDriver = new JDBCSequentialFileFactoryDriver(); + dbDriver.setSqlProvider(new MySQLSQLProvider(tableName)); + dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); + dbDriver.setJdbcDriverClass(driverClass); + } + else { + dbDriver = new JDBCSequentialFileFactoryDriver(); + dbDriver.setSqlProvider(new GenericSQLProvider(tableName)); + dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl); + dbDriver.setJdbcDriverClass(driverClass); + } + return dbDriver; + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java new file mode 100644 index 0000000000..6d8be716fa --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.drivers; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +/** + * Class to hold common database functionality such as drivers and connections + */ +public abstract class AbstractJDBCDriver { + + protected Connection connection; + + protected SQLProvider sqlProvider; + + protected String jdbcConnectionUrl; + + protected String jdbcDriverClass; + + protected Driver dbDriver; + + public AbstractJDBCDriver() { + } + + public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) { + this.jdbcConnectionUrl = jdbcConnectionUrl; + this.jdbcDriverClass = jdbcDriverClass; + this.sqlProvider = JDBCUtils.getSQLProvider(jdbcDriverClass, tableName); + } + + public void start() throws Exception { + connect(); + createSchema(); + prepareStatements(); + } + + public void stop() throws SQLException { + if (sqlProvider.closeConnectionOnShutdown()) { + connection.close(); + } + } + + protected abstract void prepareStatements() throws SQLException; + + protected abstract void createSchema() throws SQLException; + + protected void createTable(String schemaSql) throws SQLException { + JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), schemaSql); + } + + protected void connect() throws Exception { + try { + dbDriver = JDBCUtils.getDriver(jdbcDriverClass); + connection = dbDriver.connect(jdbcConnectionUrl, new Properties()); + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcConnectionUrl); + throw new RuntimeException("Error connecting to database", e); + } + } + + public void destroy() throws Exception { + try { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName()); + statement.close(); + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + public Connection getConnection() { + return connection; + } + + public void setConnection(Connection connection) { + this.connection = connection; + } + + public SQLProvider getSqlProvider() { + return sqlProvider; + } + + public void setSqlProvider(SQLProvider sqlProvider) { + this.sqlProvider = sqlProvider; + } + + public String getJdbcConnectionUrl() { + return jdbcConnectionUrl; + } + + public void setJdbcConnectionUrl(String jdbcConnectionUrl) { + this.jdbcConnectionUrl = jdbcConnectionUrl; + } + + public String getJdbcDriverClass() { + return jdbcDriverClass; + } + + public void setJdbcDriverClass(String jdbcDriverClass) { + this.jdbcDriverClass = jdbcDriverClass; + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java similarity index 86% rename from artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java rename to artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java index c14036ebbc..d9cbed4a7b 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/DerbySQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/derby/DerbySQLProvider.java @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.jdbc.store.file.sql; +package org.apache.activemq.artemis.jdbc.store.drivers.derby; + +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; public class DerbySQLProvider extends GenericSQLProvider { @@ -46,7 +48,12 @@ public class DerbySQLProvider extends GenericSQLProvider { } @Override - public String getAppendToFileSQL() { + public String getAppendToLargeObjectSQL() { return appendToFileSQL; } + + @Override + public boolean closeConnectionOnShutdown() { + return false; + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java new file mode 100644 index 0000000000..1400382c0f --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/mysql/MySQLSQLProvider.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.drivers.mysql; + +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; + +public class MySQLSQLProvider extends GenericSQLProvider { + + private static final int MAX_BLOB_SIZE = 4 * 1024 * 1024 * 1024; // 4GB + + private final String createFileTableSQL; + + private final String createJournalTableSQL; + + private final String copyFileRecordByIdSQL; + + public MySQLSQLProvider(String tName) { + super(tName.toLowerCase()); + + createFileTableSQL = "CREATE TABLE " + tableName + + "(ID INTEGER NOT NULL AUTO_INCREMENT," + + "FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA LONGBLOB, PRIMARY KEY(ID)) ENGINE=InnoDB;"; + + createJournalTableSQL = "CREATE TABLE " + tableName + + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record LONGBLOB,txDataSize INTEGER,txData LONGBLOB,txCheckNoRecords INTEGER,seq BIGINT) ENGINE=InnoDB;"; + + copyFileRecordByIdSQL = " UPDATE " + tableName + ", (SELECT DATA AS FROM_DATA FROM " + tableName + + " WHERE id=?) SELECT_COPY SET DATA=FROM_DATA WHERE id=?;"; + } + + @Override + public int getMaxBlobSize() { + return MAX_BLOB_SIZE; + } + + @Override + public String getCreateFileTableSQL() { + return createFileTableSQL; + } + + @Override + public String getCreateJournalTableSQL() { + return createJournalTableSQL; + } + + @Override + public String getCopyFileRecordByIdSQL() { + return copyFileRecordByIdSQL; + } +} \ No newline at end of file diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java new file mode 100644 index 0000000000..664202b606 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.drivers.postgres; + +import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider; + +public class PostgresSQLProvider extends GenericSQLProvider { + + // BYTEA Size used in Journal + private static final int MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB + + private final String createFileTableSQL; + + private final String createJournalTableSQL; + + public PostgresSQLProvider(String tName) { + super(tName.toLowerCase()); + createFileTableSQL = "CREATE TABLE " + tableName + + "(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))"; + + createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)"; + } + + @Override + public String getCreateFileTableSQL() { + return createFileTableSQL; + } + + @Override + public String getCreateJournalTableSQL() { + return createJournalTableSQL; + } + + @Override + public int getMaxBlobSize() { + return MAX_BLOB_SIZE; + } +} + diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java new file mode 100644 index 0000000000..4d42d7f223 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.drivers.postgres; + +import java.nio.ByteBuffer; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver; +import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile; +import org.postgresql.PGConnection; +import org.postgresql.largeobject.LargeObject; +import org.postgresql.largeobject.LargeObjectManager; + +public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver { + + private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY"; + + public PostgresSequentialSequentialFileDriver() throws SQLException { + super(); + } + + @Override + public synchronized void createFile(JDBCSequentialFile file) throws SQLException { + try { + connection.setAutoCommit(false); + + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + long oid = lobjManager.createLO(); + + createFile.setString(1, file.getFileName()); + createFile.setString(2, file.getExtension()); + createFile.setLong(3, oid); + createFile.executeUpdate(); + + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + file.setId(keys.getInt(1)); + } + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + @Override + public synchronized void loadFile(JDBCSequentialFile file) throws SQLException { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.setWritePosition(getPostGresLargeObjectSize(file)); + } + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + @Override + public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + LargeObject largeObject = null; + + Long oid = getOID(file); + try { + connection.setAutoCommit(false); + largeObject = lobjManager.open(oid, LargeObjectManager.WRITE); + largeObject.seek(largeObject.size()); + largeObject.write(data); + largeObject.close(); + connection.commit(); + } + catch (Exception e) { + connection.rollback(); + throw e; + } + return data.length; + } + + @Override + public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + LargeObject largeObject = null; + long oid = getOID(file); + try { + connection.setAutoCommit(false); + largeObject = lobjManager.open(oid, LargeObjectManager.READ); + int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position()); + + if (readLength > 0) { + if (file.position() > 0) largeObject.seek((int) file.position()); + byte[] data = largeObject.read(readLength); + bytes.put(data); + } + + largeObject.close(); + connection.commit(); + + return readLength; + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + private synchronized Long getOID(JDBCSequentialFile file) throws SQLException { + Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY); + if (oid == null) { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1)); + } + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) { + System.out.println("FD"); + } + return (Long) file.getMetaData(POSTGRES_OID_KEY); + } + + private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException { + LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI(); + + int size = 0; + Long oid = getOID(file); + if (oid != null) { + try { + connection.setAutoCommit(false); + LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ); + size = largeObject.size(); + largeObject.close(); + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + return size; + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 73bec72314..5de876185a 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -19,24 +19,20 @@ package org.apache.activemq.artemis.jdbc.store.file; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.sql.Blob; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; -import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.jboss.logging.Logger; public class JDBCSequentialFile implements SequentialFile { @@ -53,20 +49,6 @@ public class JDBCSequentialFile implements SequentialFile { private int id = -1; - private final PreparedStatement appendToFile; - - private final PreparedStatement deleteFile; - - private final PreparedStatement readFile; - - private final PreparedStatement createFile; - - private final PreparedStatement selectFileByFileName; - - private final PreparedStatement copyFileRecord; - - private final PreparedStatement renameFile; - private long readPosition = 0; private long writePosition = 0; @@ -75,33 +57,28 @@ public class JDBCSequentialFile implements SequentialFile { private JDBCSequentialFileFactory fileFactory; - private int maxSize; - - private SQLProvider sqlProvider; - private final Object writeLock; + private final JDBCSequentialFileFactoryDriver dbDriver; + + // Allows DB Drivers to cache meta data. + private Map metaData = new ConcurrentHashMap<>(); + public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory, final String filename, - final SQLProvider sqlProvider, final Executor executor, + final JDBCSequentialFileFactoryDriver driver, final Object writeLock) throws SQLException { this.fileFactory = fileFactory; this.filename = filename; this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : ""; this.executor = executor; - this.maxSize = sqlProvider.getMaxBlobSize(); - this.sqlProvider = sqlProvider; this.writeLock = writeLock; + this.dbDriver = driver; + } - Connection connection = fileFactory.getConnection(); - this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL()); - this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); - this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS); - this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL()); - this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); - this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); - this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); + public void setWritePosition(int writePosition) { + this.writePosition = writePosition; } @Override @@ -117,35 +94,11 @@ public class JDBCSequentialFile implements SequentialFile { @Override public synchronized void open() throws Exception { if (!isOpen) { - try { - synchronized (writeLock) { - selectFileByFileName.setString(1, filename); - - try (ResultSet rs = selectFileByFileName.executeQuery()) { - if (!rs.next()) { - createFile.setString(1, filename); - createFile.setString(2, extension); - createFile.setBytes(3, new byte[0]); - createFile.executeUpdate(); - try (ResultSet keys = createFile.getGeneratedKeys()) { - keys.next(); - this.id = keys.getInt(1); - } - } - else { - this.id = rs.getInt(1); - this.writePosition = rs.getBlob(4).length(); - } - } - } + synchronized (writeLock) { + dbDriver.openFile(this); + isCreated = true; + isOpen = true; } - catch (SQLException e) { - ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e); - isOpen = false; - } - - isCreated = true; - isOpen = true; } } @@ -156,7 +109,7 @@ public class JDBCSequentialFile implements SequentialFile { @Override public boolean fits(int size) { - return writePosition + size <= maxSize; + return writePosition + size <= dbDriver.getMaxSize(); } @Override @@ -183,24 +136,20 @@ public class JDBCSequentialFile implements SequentialFile { public void delete() throws IOException, InterruptedException, ActiveMQException { try { if (isCreated) { - deleteFile.setInt(1, id); - deleteFile.executeUpdate(); + synchronized (writeLock) { + dbDriver.deleteFile(this); + } } } catch (SQLException e) { - throw new IOException(e); + throw new ActiveMQException(ActiveMQExceptionType.IO_ERROR, e.getMessage(), e); } } private synchronized int internalWrite(byte[] data, IOCallback callback) { try { synchronized (writeLock) { - int noBytes = data.length; - appendToFile.setBytes(1, data); - appendToFile.setInt(2, id); - int result = appendToFile.executeUpdate(); - if (result < 1) - throw new ActiveMQException("No record found for file id: " + id); + int noBytes = dbDriver.writeToFile(this, data); seek(noBytes); if (callback != null) callback.done(); @@ -295,36 +244,19 @@ public class JDBCSequentialFile implements SequentialFile { } @Override - public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException { + public synchronized int read(ByteBuffer bytes, final IOCallback callback) throws SQLException { synchronized (writeLock) { - readFile.setInt(1, id); - try (ResultSet rs = readFile.executeQuery()) { - if (rs.next()) { - Blob blob = rs.getBlob(1); - - long bytesRemaining = blob.length() - readPosition; - byte[] data; - if (bytesRemaining > bytes.remaining()) { - // First index into blob is 1 (not 0) - data = blob.getBytes(readPosition + 1, bytes.remaining()); - } - else { - // First index into blob is 1 (not 0) - data = blob.getBytes(readPosition + 1, (int) bytesRemaining); - } - - bytes.put(data); - readPosition += data.length; - if (callback != null) - callback.done(); - - return data.length; - } - return 0; + try { + int read = dbDriver.readFromFile(this, bytes); + readPosition += read; + if (callback != null) + callback.done(); + return read; } catch (Exception e) { if (callback != null) callback.onError(-1, e.getMessage()); + e.printStackTrace(); return 0; } } @@ -352,8 +284,20 @@ public class JDBCSequentialFile implements SequentialFile { @Override public void sync() throws IOException { - // (mtaylor) We always write straight away, so we don't need to do anything here. - // (mtaylor) Is this meant to be blocking? + final SimpleWaitIOCallback callback = new SimpleWaitIOCallback(); + executor.execute(new Runnable() { + @Override + public void run() { + callback.done(); + } + }); + + try { + callback.waitCompletion(); + } + catch (Exception e) { + throw new IOException(e); + } } @Override @@ -363,15 +307,15 @@ public class JDBCSequentialFile implements SequentialFile { @Override public void renameTo(String newFileName) throws Exception { - renameFile.setString(1, newFileName); - renameFile.setInt(2, id); - renameFile.executeUpdate(); + synchronized (writeLock) { + dbDriver.renameFile(this, newFileName); + } } @Override public SequentialFile cloneFile() { try { - JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock); + JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, executor, dbDriver, writeLock); return clone; } catch (Exception e) { @@ -385,9 +329,9 @@ public class JDBCSequentialFile implements SequentialFile { JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile; clone.open(); - copyFileRecord.setInt(1, id); - copyFileRecord.setInt(2, clone.getId()); - copyFileRecord.executeUpdate(); + synchronized (writeLock) { + dbDriver.copyFileData(this, clone); + } } public int getId() { @@ -416,4 +360,16 @@ public class JDBCSequentialFile implements SequentialFile { public File getJavaFile() { return null; } + + public void addMetaData(Object key, Object value) { + metaData.put(key, value); + } + + public Object removeMetaData(Object key) { + return metaData.remove(key); + } + + public Object getMetaData(Object key) { + return metaData.get(key); + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index 4231907797..34547578ad 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -18,17 +18,11 @@ package org.apache.activemq.artemis.jdbc.store.file; import java.io.File; import java.nio.ByteBuffer; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.Executor; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -36,53 +30,59 @@ import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.jdbc.store.JDBCUtils; -import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent { - private Connection connection; - - private String connectionUrl; - - private final Driver driver; - private boolean started; - private final String tableName; - private List files; - private PreparedStatement selectFileNamesByExtension; - private Executor executor; - private SQLProvider sqlProvider; - private Map fileLocks = new HashMap<>(); + private final JDBCSequentialFileFactoryDriver dbDriver; + public JDBCSequentialFileFactory(final String connectionUrl, final String tableName, final String className, Executor executor) throws Exception { - this.connectionUrl = connectionUrl; this.executor = executor; - this.tableName = tableName.toUpperCase(); - files = new ArrayList<>(); - sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName); - driver = JDBCUtils.getDriver(className); + dbDriver = JDBCUtils.getDBFileDriver(className, tableName, connectionUrl); } - public Connection getConnection() { - return connection; + @Override + public synchronized void start() { + try { + if (!started) { + dbDriver.start(); + started = true; + } + } + catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database"); + started = false; + } + } + + @Override + public synchronized void stop() { + try { + dbDriver.stop(); + } + catch (SQLException e) { + ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection"); + } + started = false; } @Override public SequentialFile createSequentialFile(String fileName) { try { fileLocks.putIfAbsent(fileName, new Object()); - JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName)); + JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, executor, dbDriver, fileLocks.get(fileName)); files.add(file); return file; } @@ -99,15 +99,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public List listFiles(String extension) throws Exception { - List fileNames = new ArrayList<>(); - - selectFileNamesByExtension.setString(1, extension); - try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { - while (rs.next()) { - fileNames.add(rs.getString(1)); - } - } - return fileNames; + return dbDriver.listFiles(extension); } @Override @@ -171,7 +163,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void activateBuffer(SequentialFile file) { - } @Override @@ -179,34 +170,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM return null; } - @Override - public synchronized void start() { - try { - if (!started) { - connection = driver.connect(connectionUrl, new Properties()); - JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL()); - selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); - started = true; - } - } - catch (SQLException e) { - ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database"); - started = false; - } - } - - @Override - public synchronized void stop() { - try { - if (false) - connection.close(); - } - catch (SQLException e) { - ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection"); - } - started = false; - } - @Override public boolean isStarted() { return started; @@ -218,12 +181,9 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM @Override public void flush() { - } public synchronized void destroy() throws SQLException { - Statement statement = connection.createStatement(); - statement.executeUpdate(sqlProvider.getDropFileTableSQL()); - stop(); + dbDriver.destroy(); } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java new file mode 100644 index 0000000000..f8ad06b4a2 --- /dev/null +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jdbc.store.file; + +import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; + +public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver { + + protected PreparedStatement deleteFile; + + protected PreparedStatement createFile; + + protected PreparedStatement selectFileByFileName; + + protected PreparedStatement copyFileRecord; + + protected PreparedStatement renameFile; + + protected PreparedStatement readLargeObject; + + protected PreparedStatement appendToLargeObject; + + protected PreparedStatement selectFileNamesByExtension; + + public JDBCSequentialFileFactoryDriver() { + super(); + } + + public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) { + super(tableName, jdbcConnectionUrl, jdbcDriverClass); + } + + public void start() throws Exception { + super.start(); + } + + @Override + protected void createSchema() throws SQLException { + createTable(sqlProvider.getCreateFileTableSQL()); + } + + @Override + protected void prepareStatements() throws SQLException { + this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL()); + this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS); + this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName()); + this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL()); + this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL()); + this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL()); + this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL()); + this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL()); + } + + public synchronized List listFiles(String extension) throws Exception { + List fileNames = new ArrayList<>(); + try { + connection.setAutoCommit(false); + selectFileNamesByExtension.setString(1, extension); + try (ResultSet rs = selectFileNamesByExtension.executeQuery()) { + while (rs.next()) { + fileNames.add(rs.getString(1)); + } + } + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + return fileNames; + } + + /** + * Opens the supplied file. If the file does not exist in the database it will create a new one. + * + * @param file + * @return + * @throws SQLException + */ + public void openFile(JDBCSequentialFile file) throws SQLException { + int fileId = fileExists(file); + if (fileId < 0) { + createFile(file); + } + else { + file.setId(fileId); + loadFile(file); + } + } + + /** + * Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1. + * + * @param file + * @return + * @throws SQLException + */ + public synchronized int fileExists(JDBCSequentialFile file) throws SQLException { + connection.setAutoCommit(false); + selectFileByFileName.setString(1, file.getFileName()); + try (ResultSet rs = selectFileByFileName.executeQuery()) { + int id = rs.next() ? rs.getInt(1) : -1; + connection.commit(); + return id; + } + catch (Exception e) { + connection.rollback(); + throw e; + } + } + + /** + * Loads an existing file. + * + * @param file + * @throws SQLException + */ + public synchronized void loadFile(JDBCSequentialFile file) throws SQLException { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + file.setWritePosition((int) rs.getBlob(1).length()); + } + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + /** + * Creates a new database row representing the supplied file. + * + * @param file + * @throws SQLException + */ + public synchronized void createFile(JDBCSequentialFile file) throws SQLException { + try { + connection.setAutoCommit(false); + createFile.setString(1, file.getFileName()); + createFile.setString(2, file.getExtension()); + createFile.setBytes(3, new byte[0]); + createFile.executeUpdate(); + try (ResultSet keys = createFile.getGeneratedKeys()) { + keys.next(); + file.setId(keys.getInt(1)); + } + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + /** + * Updates the fileName field to the new value. + * + * @param file + * @param newFileName + * @throws SQLException + */ + public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws SQLException { + try { + connection.setAutoCommit(false); + renameFile.setString(1, newFileName); + renameFile.setInt(2, file.getId()); + renameFile.executeUpdate(); + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + /** + * Deletes the associated row in the database. + * + * @param file + * @throws SQLException + */ + public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException { + try { + connection.setAutoCommit(false); + deleteFile.setInt(1, file.getId()); + deleteFile.executeUpdate(); + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + /** + * Persists data to this files associated database mapping. + * + * @param file + * @param data + * @return + * @throws Exception + */ + public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException { + try { + connection.setAutoCommit(false); + appendToLargeObject.setBytes(1, data); + appendToLargeObject.setInt(2, file.getId()); + appendToLargeObject.executeUpdate(); + connection.commit(); + return data.length; + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + /** + * Reads data from the file (at file.readPosition) into the byteBuffer. + * + * @param file + * @param bytes + * @return + * @throws Exception + */ + public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException { + connection.setAutoCommit(false); + readLargeObject.setInt(1, file.getId()); + int readLength = 0; + try (ResultSet rs = readLargeObject.executeQuery()) { + if (rs.next()) { + Blob blob = rs.getBlob(1); + readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position()); + byte[] data = blob.getBytes(file.position() + 1, (int) readLength); + bytes.put(data); + } + connection.commit(); + return readLength; + } + catch (Throwable e) { + connection.rollback(); + throw e; + } + } + + /** + * Copy the data content of FileFrom to FileTo + * + * @param fileFrom + * @param fileTo + * @throws SQLException + */ + public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile fileTo) throws SQLException { + try { + connection.setAutoCommit(false); + copyFileRecord.setInt(1, fileFrom.getId()); + copyFileRecord.setInt(2, fileTo.getId()); + copyFileRecord.executeUpdate(); + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + /** + * Drop all tables and data + */ + public synchronized void destroy() throws SQLException { + try { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + statement.executeUpdate(sqlProvider.getDropFileTableSQL()); + connection.commit(); + } + catch (SQLException e) { + connection.rollback(); + throw e; + } + } + + public long calculateReadLength(long objectLength, int bufferSpace, long readPosition) { + long bytesRemaining = objectLength - readPosition; + if (bytesRemaining > bufferSpace) { + return bufferSpace; + } + else { + return bytesRemaining; + } + } + + public int getMaxSize() { + return sqlProvider.getMaxBlobSize(); + } +} diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index f25316780c..6c051125ed 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -17,21 +17,15 @@ package org.apache.activemq.artemis.jdbc.store.journal; -import java.sql.Connection; -import java.sql.Driver; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; @@ -44,23 +38,18 @@ import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; -import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; -public class JDBCJournalImpl implements Journal { +public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // Sync Delay in ms public static final int SYNC_DELAY = 5; private static int USER_VERSION = 1; - private final String tableName; - - private final String jdbcDriverClass; - - private Connection connection; - - private List records; + private final List records; private PreparedStatement insertJournalRecords; @@ -74,13 +63,9 @@ public class JDBCJournalImpl implements Journal { private boolean started; - private String jdbcUrl; - private Timer syncTimer; - private Driver dbDriver; - - private final ReadWriteLock journalLock = new ReentrantReadWriteLock(); + private final Object journalLock = new Object(); private final String timerThread; @@ -90,68 +75,49 @@ public class JDBCJournalImpl implements Journal { // Sequence ID for journal records private AtomicLong seq = new AtomicLong(0); - public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) { - this.tableName = tableName; - this.jdbcUrl = jdbcUrl; - this.jdbcDriverClass = jdbcDriverClass; - timerThread = "Timer JDBC Journal(" + tableName + ")"; + private Logger logger = Logger.getLogger(this.getClass()); + public JDBCJournalImpl(String jdbcUrl, String tableName, String jdbcDriverClass) { + super(tableName, jdbcUrl, jdbcDriverClass); + timerThread = "Timer JDBC Journal(" + tableName + ")"; records = new ArrayList<>(); } @Override public void start() throws Exception { - dbDriver = JDBCUtils.getDriver(jdbcDriverClass); - - try { - connection = dbDriver.connect(jdbcUrl, new Properties()); - } - catch (SQLException e) { - ActiveMQJournalLogger.LOGGER.error("Unable to connect to database using URL: " + jdbcUrl); - throw new RuntimeException("Error connecting to database", e); - } - - JDBCUtils.createTableIfNotExists(connection, tableName, JDBCJournalRecord.createTableSQL(tableName)); - - insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName)); - selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName)); - countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName); - deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName)); - deleteJournalTxRecords = connection.prepareStatement(JDBCJournalRecord.deleteJournalTxRecordsSQL(tableName)); - + super.start(); syncTimer = new Timer(timerThread, true); syncTimer.schedule(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY); - started = true; } - @Override - public void stop() throws Exception { - stop(true); + protected void createSchema() throws SQLException { + createTable(sqlProvider.getCreateJournalTableSQL()); } - public synchronized void stop(boolean shutdownConnection) throws Exception { + @Override + protected void prepareStatements() throws SQLException { + insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL()); + selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL()); + countJournalRecords = connection.prepareStatement(sqlProvider.getCountJournalRecordsSQL()); + deleteJournalRecords = connection.prepareStatement(sqlProvider.getDeleteJournalRecordsSQL()); + deleteJournalTxRecords = connection.prepareStatement(sqlProvider.getDeleteJournalTxRecordsSQL()); + } + + @Override + public synchronized void stop() throws SQLException { if (started) { - journalLock.writeLock().lock(); - - syncTimer.cancel(); - - sync(); - if (shutdownConnection) { - connection.close(); + synchronized (journalLock) { + syncTimer.cancel(); + sync(); + started = false; + super.stop(); } - - started = false; - journalLock.writeLock().unlock(); } } public synchronized void destroy() throws Exception { - connection.setAutoCommit(false); - Statement statement = connection.createStatement(); - statement.executeUpdate("DROP TABLE " + tableName); - statement.close(); - connection.commit(); + super.destroy(); stop(); } @@ -159,8 +125,11 @@ public class JDBCJournalImpl implements Journal { if (!started) return 0; - List recordRef = records; - records = new ArrayList(); + List recordRef = new ArrayList<>(); + synchronized (records) { + recordRef.addAll(records); + records.clear(); + } // We keep a list of deleted records and committed tx (used for cleaning up old transaction data). List deletedRecords = new ArrayList<>(); @@ -215,12 +184,18 @@ public class JDBCJournalImpl implements Journal { deleteJournalTxRecords.executeBatch(); connection.commit(); - - cleanupTxRecords(deletedRecords, committedTransactions); success = true; } catch (SQLException e) { - performRollback(connection, recordRef); + performRollback(recordRef); + } + + try { + if (success) + cleanupTxRecords(deletedRecords, committedTransactions); + } + catch (SQLException e) { + e.printStackTrace(); } executeCallbacks(recordRef, success); @@ -230,12 +205,11 @@ public class JDBCJournalImpl implements Journal { /* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted, we remove the Tx Records (i.e. PREPARE, COMMIT). */ private synchronized void cleanupTxRecords(List deletedRecords, List committedTx) throws SQLException { - + connection.rollback(); List iterableCopy; List iterableCopyTx = new ArrayList<>(); iterableCopyTx.addAll(transactions.values()); - for (Long txId : committedTx) { transactions.get(txId).committed = true; } @@ -260,9 +234,8 @@ public class JDBCJournalImpl implements Journal { } } - private void performRollback(Connection connection, List records) { + private void performRollback(List records) { try { - connection.rollback(); for (JDBCJournalRecord record : records) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { removeTxRecord(record); @@ -306,18 +279,18 @@ public class JDBCJournalImpl implements Journal { record.setIoCompletion(callback); } - try { - journalLock.writeLock().lock(); + synchronized (journalLock) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { addTxRecord(record); } - records.add(record); - } - finally { - journalLock.writeLock().unlock(); + + synchronized (records) { + records.add(record); + } } - if (callback != null) callback.waitCompletion(); + if (callback != null) + callback.waitCompletion(); } private synchronized void addTxRecord(JDBCJournalRecord record) { @@ -703,12 +676,12 @@ public class JDBCJournalImpl implements Journal { @Override public final void synchronizationLock() { - journalLock.writeLock().lock(); + logger.error("Replication is not supported with JDBC Store"); } @Override public final void synchronizationUnlock() { - journalLock.writeLock().unlock(); + logger.error("Replication is not supported with JDBC Store"); } @Override diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java similarity index 59% rename from artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java rename to artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java index c95edb3d53..6efa170b64 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/GenericSQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/GenericSQLProvider.java @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.jdbc.store.file.sql; +package org.apache.activemq.artemis.jdbc.store.sql; public class GenericSQLProvider implements SQLProvider { // Default to lowest (MYSQL = 64k) private static final int MAX_BLOB_SIZE = 64512; - private final String tableName; + protected final String tableName; private final String createFileTableSQL; @@ -33,7 +33,7 @@ public class GenericSQLProvider implements SQLProvider { private final String appendToFileSQL; - private final String readFileSQL; + private final String readLargeObjectSQL; private final String deleteFileSQL; @@ -45,14 +45,25 @@ public class GenericSQLProvider implements SQLProvider { private final String dropFileTableSQL; + private final String createJournalTableSQL; + + private final String insertJournalRecordsSQL; + + private final String selectJournalRecordsSQL; + + private final String deleteJournalRecordsSQL; + + private final String deleteJournalTxRecordsSQL; + + private final String countJournalRecordsSQL; + public GenericSQLProvider(String tableName) { this.tableName = tableName; createFileTableSQL = "CREATE TABLE " + tableName + "(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))"; - insertFileSQL = "INSERT INTO " + tableName + - " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)"; + insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)"; selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?"; @@ -60,7 +71,7 @@ public class GenericSQLProvider implements SQLProvider { appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?"; - readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?"; + readLargeObjectSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?"; deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?"; @@ -72,6 +83,18 @@ public class GenericSQLProvider implements SQLProvider { copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?"; dropFileTableSQL = "DROP TABLE " + tableName; + + createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT)"; + + insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)"; + + selectJournalRecordsSQL = "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC"; + + deleteJournalRecordsSQL = "DELETE FROM " + tableName + " WHERE id = ?"; + + deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?"; + + countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName; } @Override @@ -84,6 +107,38 @@ public class GenericSQLProvider implements SQLProvider { return tableName; } + // Journal SQL Statements + @Override + public String getCreateJournalTableSQL() { + return createJournalTableSQL; + } + + @Override + public String getInsertJournalRecordsSQL() { + return insertJournalRecordsSQL; + } + + @Override + public String getSelectJournalRecordsSQL() { + return selectJournalRecordsSQL; + } + + @Override + public String getDeleteJournalRecordsSQL() { + return deleteJournalRecordsSQL; + } + + @Override + public String getDeleteJournalTxRecordsSQL() { + return deleteJournalTxRecordsSQL; + } + + @Override + public String getCountJournalRecordsSQL() { + return countJournalRecordsSQL; + } + + // Large Message Statements @Override public String getCreateFileTableSQL() { return createFileTableSQL; @@ -105,13 +160,13 @@ public class GenericSQLProvider implements SQLProvider { } @Override - public String getAppendToFileSQL() { + public String getAppendToLargeObjectSQL() { return appendToFileSQL; } @Override - public String getReadFileSQL() { - return readFileSQL; + public String getReadLargeObjectSQL() { + return readLargeObjectSQL; } @Override @@ -139,5 +194,8 @@ public class GenericSQLProvider implements SQLProvider { return dropFileTableSQL; } - + @Override + public boolean closeConnectionOnShutdown() { + return true; + } } diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java similarity index 74% rename from artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java rename to artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java index e9fe36c1b7..5645ebc87e 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/sql/SQLProvider.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/sql/SQLProvider.java @@ -14,12 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.jdbc.store.file.sql; +package org.apache.activemq.artemis.jdbc.store.sql; public interface SQLProvider { int getMaxBlobSize(); + String getCreateJournalTableSQL(); + + String getInsertJournalRecordsSQL(); + + String getSelectJournalRecordsSQL(); + + String getDeleteJournalRecordsSQL(); + + String getDeleteJournalTxRecordsSQL(); + String getTableName(); String getCreateFileTableSQL(); @@ -30,9 +40,9 @@ public interface SQLProvider { String getSelectFileByFileName(); - String getAppendToFileSQL(); + String getAppendToLargeObjectSQL(); - String getReadFileSQL(); + String getReadLargeObjectSQL(); String getDeleteFileSQL(); @@ -43,4 +53,8 @@ public interface SQLProvider { String getDropFileTableSQL(); String getCloneFileRecordByIdSQL(); + + String getCountJournalRecordsSQL(); + + boolean closeConnectionOnShutdown(); } diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java index bd200bd058..1a8fba4c9f 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/file/JDBCSequentialFileFactoryTest.java @@ -63,7 +63,7 @@ public class JDBCSequentialFileFactoryTest { } @After - public void tearDown() throws SQLException { + public void tearDown() throws Exception { factory.destroy(); } @@ -126,8 +126,8 @@ public class JDBCSequentialFileFactoryTest { JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt"); file.copyTo(copy); - checkData(copy, src); checkData(file, src); + checkData(copy, src); } @Test @@ -145,7 +145,12 @@ public class JDBCSequentialFileFactoryTest { IOCallbackCountdown callback = new IOCallbackCountdown(1); file.internalWrite(src, callback); + assertEquals(bufferSize, file.size()); JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile(); + copy.open(); + + assertEquals(bufferSize, copy.size()); + assertEquals(bufferSize, file.size()); } private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 25b7cd1bc2..a5d55ad7a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -88,10 +88,8 @@ public class JDBCJournalStorageManager extends JournalStorageManager { beforeStop(); - ((JDBCJournalImpl) bindingsJournal).stop(false); - - ((JDBCJournalImpl) messageJournal).stop(false); - + bindingsJournal.stop(); + messageJournal.stop(); largeMessagesFactory.stop(); singleThreadExecutor.shutdown(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 86d8cf6841..31ad1d6504 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -40,6 +40,10 @@ import java.lang.management.ManagementFactory; import java.lang.ref.Reference; import java.lang.ref.WeakReference; import java.net.ServerSocket; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.ResultSet; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -124,6 +128,8 @@ import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivatio import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; +import org.apache.activemq.artemis.jdbc.store.JDBCUtils; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; @@ -448,12 +454,34 @@ public abstract class ActiveMQTestBase extends Assert { DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration(); dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl()); dbStorageConfiguration.setBindingsTableName("BINDINGS"); - dbStorageConfiguration.setMessageTableName("MESSAGES"); - dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver"); + dbStorageConfiguration.setMessageTableName("MESSAGE"); + dbStorageConfiguration.setLargeMessageTableName("LARGE_MESSAGE"); + dbStorageConfiguration.setJdbcDriverClassName(getJDBCClassName()); configuration.setStoreConfiguration(dbStorageConfiguration); } + public void destroyTables(List tableNames) throws Exception { + Driver driver = JDBCUtils.getDriver(getJDBCClassName()); + Connection connection = driver.connect(getTestJDBCConnectionUrl(), null); + Statement statement = connection.createStatement(); + try { + for (String tableName : tableNames) { + SQLProvider sqlProvider = JDBCUtils.getSQLProvider(getJDBCClassName(), tableName); + ResultSet rs = connection.getMetaData().getTables(null, null, sqlProvider.getTableName(), null); + if (rs.next()) { + statement.execute("DROP TABLE " + sqlProvider.getTableName()); + } + } + } + catch (Throwable e) { + e.printStackTrace(); + } + finally { + connection.close(); + } + } + protected Map generateInVMParams(final int node) { Map params = new HashMap<>(); @@ -797,7 +825,11 @@ public abstract class ActiveMQTestBase extends Assert { } protected final String getTestJDBCConnectionUrl() { - return "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true"; + return System.getProperty("jdbc.connection.url", "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true"); + } + + protected final String getJDBCClassName() { + return System.getProperty("jdbc.driver.class","org.apache.derby.jdbc.EmbeddedDriver"); } protected final File getTestDirfile() { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java index 3b53d53253..fa881d5352 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java @@ -185,6 +185,10 @@ public class ThreadLeakCheckRule extends ExternalResource { // The derby engine is initialized once, and lasts the lifetime of the VM return true; } + else if (threadName.contains("Abandoned connection cleanup thread")) { + // MySQL Engine checks for abandoned connections + return true; + } else if (threadName.contains("Timer")) { // The timer threads in Derby and JDBC use daemon and shutdown once user threads exit. return true; diff --git a/pom.xml b/pom.xml index ba4d6a5ff4..d0327fa7b1 100644 --- a/pom.xml +++ b/pom.xml @@ -229,6 +229,14 @@ + + org.postgresql + postgresql + 9.4-1205-jdbc4 + provided + + + commons-collections commons-collections-testframework diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 90f54254f8..b0303c7cb4 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -240,11 +240,20 @@ jboss-javaee 5.0.0.GA + + org.apache.derby derby - ${apache.derby.version} + test + + + org.postgresql + postgresql + test + + io.vertx diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java index 6664d1ee09..03ed5b04cb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/LargeMessageTestBase.java @@ -79,6 +79,13 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase { this.storeType = storeType; } + public void tearDown() throws Exception { + super.tearDown(); + if (storeType == StoreConfiguration.StoreType.DATABASE) { + destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE")); + } + } + @Parameterized.Parameters(name = "storeType={0}") public static Collection data() { Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java index 391cccf50d..4f4c5de53b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RolesConfigurationStorageTest.java @@ -40,7 +40,6 @@ public class RolesConfigurationStorageTest extends StorageManagerTestBase { @Before public void setUp() throws Exception { super.setUp(); - mapExpectedSets = new HashMap<>(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index 75e5eb28b7..886cde3ab0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.persistence; -import java.sql.DriverManager; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -105,15 +103,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { jmsJournal = null; } - // Stops the database engine early to stop thread leaks showing. - if (storeType == StoreConfiguration.StoreType.DATABASE) { - try { - DriverManager.getConnection("jdbc:derby:;shutdown=true"); - } - catch (SQLException e) { - } - } - + destroyTables(Arrays.asList(new String[] {"MESSAGE", "BINDINGS", "LARGE_MESSAGE"})); super.tearDown(); if (exception != null) throw exception; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java index 5394565716..06a3f77fbc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/xa/BasicXaRecoveryTest.java @@ -125,6 +125,9 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase { public void tearDown() throws Exception { MBeanServerFactory.releaseMBeanServer(mbeanServer); super.tearDown(); + if (storeType == StoreConfiguration.StoreType.DATABASE) { + destroyTables(Arrays.asList("BINDINGS", "LARGE_MESSAGE", "MESSAGE")); + } } @Test diff --git a/tests/stress-tests/pom.xml b/tests/stress-tests/pom.xml index 4dc798836d..5373434bd5 100644 --- a/tests/stress-tests/pom.xml +++ b/tests/stress-tests/pom.xml @@ -111,6 +111,12 @@ org.apache.geronimo.specs geronimo-jms_2.0_spec + + org.apache.derby + derby + ${apache.derby.version} + test +