Add PostGres Driver
This commit is contained in:
parent
466d43c63d
commit
79904aeb64
|
@ -53,12 +53,18 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Database driver support -->
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-journal</artifactId>
|
||||
|
|
|
@ -24,7 +24,9 @@ import java.sql.SQLException;
|
|||
import java.sql.Statement;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCFileFactoryDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
|
||||
|
@ -70,24 +72,35 @@ public class JDBCUtils {
|
|||
if (driverClass.contains("derby")) {
|
||||
return new DerbySQLProvider(tableName);
|
||||
}
|
||||
else if (driverClass.contains("postgres")) {
|
||||
return new PostgresSQLProvider(tableName);
|
||||
}
|
||||
else {
|
||||
return new GenericSQLProvider(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
public static JDBCFileFactoryDriver getDBFileDriver(String driverClass, String tableName, String jdbcConnectionUrl) throws SQLException {
|
||||
JDBCFileFactoryDriver dbDriver;
|
||||
public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
|
||||
String tableName,
|
||||
String jdbcConnectionUrl) throws SQLException {
|
||||
JDBCSequentialFileFactoryDriver dbDriver;
|
||||
if (driverClass.contains("derby")) {
|
||||
dbDriver = new JDBCFileFactoryDriver();
|
||||
dbDriver = new JDBCSequentialFileFactoryDriver();
|
||||
dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
|
||||
dbDriver.setConnectionURL(jdbcConnectionUrl);
|
||||
dbDriver.setDriverClass(driverClass);
|
||||
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
dbDriver.setJdbcDriverClass(driverClass);
|
||||
}
|
||||
else if (driverClass.contains("postgres")) {
|
||||
dbDriver = new PostgresSequentialSequentialFileDriver();
|
||||
dbDriver.setSqlProvider(new PostgresSQLProvider(tableName));
|
||||
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
dbDriver.setJdbcDriverClass(driverClass);
|
||||
}
|
||||
else {
|
||||
dbDriver = new JDBCFileFactoryDriver();
|
||||
dbDriver = new JDBCSequentialFileFactoryDriver();
|
||||
dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
|
||||
dbDriver.setConnectionURL(jdbcConnectionUrl);
|
||||
dbDriver.setDriverClass(driverClass);
|
||||
dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
|
||||
dbDriver.setJdbcDriverClass(driverClass);
|
||||
}
|
||||
return dbDriver;
|
||||
}
|
||||
|
|
|
@ -33,14 +33,17 @@ public abstract class AbstractJDBCDriver {
|
|||
|
||||
protected Connection connection;
|
||||
|
||||
protected final SQLProvider sqlProvider;
|
||||
protected SQLProvider sqlProvider;
|
||||
|
||||
protected final String jdbcConnectionUrl;
|
||||
protected String jdbcConnectionUrl;
|
||||
|
||||
protected final String jdbcDriverClass;
|
||||
protected String jdbcDriverClass;
|
||||
|
||||
protected Driver dbDriver;
|
||||
|
||||
public AbstractJDBCDriver() {
|
||||
}
|
||||
|
||||
public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
|
||||
this.jdbcConnectionUrl = jdbcConnectionUrl;
|
||||
this.jdbcDriverClass = jdbcDriverClass;
|
||||
|
@ -53,7 +56,7 @@ public abstract class AbstractJDBCDriver {
|
|||
prepareStatements();
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
public void stop() throws SQLException {
|
||||
if (sqlProvider.closeConnectionOnShutdown()) {
|
||||
connection.close();
|
||||
}
|
||||
|
@ -79,10 +82,48 @@ public abstract class AbstractJDBCDriver {
|
|||
}
|
||||
|
||||
public void destroy() throws Exception {
|
||||
connection.setAutoCommit(false);
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
|
||||
statement.close();
|
||||
connection.commit();
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
|
||||
statement.close();
|
||||
connection.commit();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
return connection;
|
||||
}
|
||||
|
||||
public void setConnection(Connection connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
public SQLProvider getSqlProvider() {
|
||||
return sqlProvider;
|
||||
}
|
||||
|
||||
public void setSqlProvider(SQLProvider sqlProvider) {
|
||||
this.sqlProvider = sqlProvider;
|
||||
}
|
||||
|
||||
public String getJdbcConnectionUrl() {
|
||||
return jdbcConnectionUrl;
|
||||
}
|
||||
|
||||
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
|
||||
this.jdbcConnectionUrl = jdbcConnectionUrl;
|
||||
}
|
||||
|
||||
public String getJdbcDriverClass() {
|
||||
return jdbcDriverClass;
|
||||
}
|
||||
|
||||
public void setJdbcDriverClass(String jdbcDriverClass) {
|
||||
this.jdbcDriverClass = jdbcDriverClass;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
|
||||
public class PostgresSQLProvider extends GenericSQLProvider {
|
||||
|
||||
// BYTEA Size used in Journal
|
||||
private static final int MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB
|
||||
|
||||
private final String createFileTableSQL;
|
||||
|
||||
private final String createJournalTableSQL;
|
||||
|
||||
public PostgresSQLProvider(String tName) {
|
||||
super(tName.toLowerCase());
|
||||
createFileTableSQL = "CREATE TABLE " + tableName +
|
||||
"(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
|
||||
|
||||
createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCreateFileTableSQL() {
|
||||
return createFileTableSQL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCreateJournalTableSQL() {
|
||||
return createJournalTableSQL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxBlobSize() {
|
||||
return MAX_BLOB_SIZE;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||
import org.postgresql.PGConnection;
|
||||
import org.postgresql.largeobject.LargeObject;
|
||||
import org.postgresql.largeobject.LargeObjectManager;
|
||||
|
||||
public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
|
||||
|
||||
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
|
||||
|
||||
public PostgresSequentialSequentialFileDriver() throws SQLException {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
long oid = lobjManager.createLO();
|
||||
|
||||
createFile.setString(1, file.getFileName());
|
||||
createFile.setString(2, file.getExtension());
|
||||
createFile.setLong(3, oid);
|
||||
createFile.executeUpdate();
|
||||
|
||||
try (ResultSet keys = createFile.getGeneratedKeys()) {
|
||||
keys.next();
|
||||
file.setId(keys.getInt(1));
|
||||
}
|
||||
connection.commit();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
|
||||
connection.setAutoCommit(false);
|
||||
readLargeObject.setInt(1, file.getId());
|
||||
|
||||
try (ResultSet rs = readLargeObject.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
file.setWritePosition(getPostGresLargeObjectSize(file));
|
||||
}
|
||||
connection.commit();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException {
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
LargeObject largeObject = null;
|
||||
|
||||
Long oid = getOID(file);
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
|
||||
largeObject.seek(largeObject.size());
|
||||
largeObject.write(data);
|
||||
largeObject.close();
|
||||
connection.commit();
|
||||
}
|
||||
catch (Exception e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
return data.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLException {
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
LargeObject largeObject = null;
|
||||
long oid = getOID(file);
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
largeObject = lobjManager.open(oid, LargeObjectManager.READ);
|
||||
int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(), file.position());
|
||||
|
||||
if (readLength > 0) {
|
||||
if (file.position() > 0) largeObject.seek((int) file.position());
|
||||
byte[] data = largeObject.read(readLength);
|
||||
bytes.put(data);
|
||||
}
|
||||
|
||||
largeObject.close();
|
||||
connection.commit();
|
||||
|
||||
return readLength;
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
|
||||
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
|
||||
if (oid == null) {
|
||||
connection.setAutoCommit(false);
|
||||
readLargeObject.setInt(1, file.getId());
|
||||
try (ResultSet rs = readLargeObject.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
|
||||
}
|
||||
connection.commit();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
|
||||
System.out.println("FD");
|
||||
}
|
||||
return (Long) file.getMetaData(POSTGRES_OID_KEY);
|
||||
}
|
||||
|
||||
private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
|
||||
LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
|
||||
|
||||
int size = 0;
|
||||
Long oid = getOID(file);
|
||||
if (oid != null) {
|
||||
try {
|
||||
connection.setAutoCommit(false);
|
||||
LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
|
||||
size = largeObject.size();
|
||||
largeObject.close();
|
||||
connection.commit();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
connection.rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -59,9 +59,7 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
|
||||
private final Object writeLock;
|
||||
|
||||
private final JDBCFileFactoryDriver dbDriver;
|
||||
|
||||
private static final Logger log = Logger.getLogger(JDBCSequentialFile.class.getName());
|
||||
private final JDBCSequentialFileFactoryDriver dbDriver;
|
||||
|
||||
// Allows DB Drivers to cache meta data.
|
||||
private Map<Object, Object> metaData = new ConcurrentHashMap<>();
|
||||
|
@ -69,7 +67,7 @@ public class JDBCSequentialFile implements SequentialFile {
|
|||
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
|
||||
final String filename,
|
||||
final Executor executor,
|
||||
final JDBCFileFactoryDriver driver,
|
||||
final JDBCSequentialFileFactoryDriver driver,
|
||||
final Object writeLock) throws SQLException {
|
||||
this.fileFactory = fileFactory;
|
||||
this.filename = filename;
|
||||
|
|
|
@ -42,7 +42,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
|
||||
private Map<String, Object> fileLocks = new HashMap<>();
|
||||
|
||||
private final JDBCFileFactoryDriver dbDriver;
|
||||
private final JDBCSequentialFileFactoryDriver dbDriver;
|
||||
|
||||
public JDBCSequentialFileFactory(final String connectionUrl,
|
||||
final String tableName,
|
||||
|
@ -184,5 +184,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
}
|
||||
|
||||
public synchronized void destroy() throws SQLException {
|
||||
dbDriver.destroy();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,24 +18,16 @@ package org.apache.activemq.artemis.jdbc.store.file;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.Blob;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Driver;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
|
||||
|
||||
public class JDBCFileFactoryDriver {
|
||||
|
||||
protected Connection connection;
|
||||
|
||||
protected SQLProvider sqlProvider;
|
||||
public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
||||
|
||||
protected PreparedStatement deleteFile;
|
||||
|
||||
|
@ -53,37 +45,24 @@ public class JDBCFileFactoryDriver {
|
|||
|
||||
protected PreparedStatement selectFileNamesByExtension;
|
||||
|
||||
protected String connectionUrl;
|
||||
|
||||
protected String driverClass;
|
||||
|
||||
public JDBCFileFactoryDriver() {
|
||||
public JDBCSequentialFileFactoryDriver() {
|
||||
super();
|
||||
}
|
||||
|
||||
public void setConnectionURL(String connectionUrl) {
|
||||
this.connectionUrl = connectionUrl;
|
||||
}
|
||||
|
||||
public void setSqlProvider(SQLProvider sqlProvider) {
|
||||
this.sqlProvider = sqlProvider;
|
||||
}
|
||||
|
||||
public void setDriverClass(String driverClass) {
|
||||
this.driverClass = driverClass;
|
||||
public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass) {
|
||||
super(tableName, jdbcConnectionUrl, jdbcDriverClass);
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
Driver driver = JDBCUtils.getDriver(driverClass);
|
||||
connection = driver.connect(connectionUrl, new Properties());
|
||||
JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), sqlProvider.getCreateFileTableSQL());
|
||||
prepareStatements();
|
||||
super.start();
|
||||
}
|
||||
|
||||
public void stop() throws SQLException {
|
||||
if (sqlProvider.closeConnectionOnShutdown())
|
||||
connection.close();
|
||||
@Override
|
||||
protected void createSchema() throws SQLException {
|
||||
createTable(sqlProvider.getCreateFileTableSQL());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prepareStatements() throws SQLException {
|
||||
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
|
||||
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
|
|
@ -95,6 +95,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
createTable(sqlProvider.getCreateJournalTableSQL());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prepareStatements() throws SQLException {
|
||||
insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
|
||||
selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
|
||||
|
@ -104,7 +105,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() throws Exception {
|
||||
public synchronized void stop() throws SQLException {
|
||||
if (started) {
|
||||
synchronized (journalLock) {
|
||||
syncTimer.cancel();
|
||||
|
|
8
pom.xml
8
pom.xml
|
@ -229,6 +229,14 @@
|
|||
<!-- Eclipse Public License - v 1.0 -->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<version>9.4-1205-jdbc4</version>
|
||||
<scope>provided</scope>
|
||||
<!-- postgresql license -->
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections-testframework</artifactId>
|
||||
|
|
|
@ -240,10 +240,18 @@
|
|||
<artifactId>jboss-javaee</artifactId>
|
||||
<version>5.0.0.GA</version>
|
||||
</dependency>
|
||||
|
||||
<!-- DB Test Deps -->
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<version>${apache.derby.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!--Vertx provided dependencies-->
|
||||
<dependency>
|
||||
|
|
Loading…
Reference in New Issue