[ARTEMIS-1590] Properties-based SQLProvider
(cherry picked from commit 938fbd81cb
)
This commit is contained in:
parent
f005da6dfd
commit
2a0b9039a3
|
@ -18,11 +18,7 @@ package org.apache.activemq.artemis.jdbc.store.drivers;
|
||||||
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.oracle.Oracle12CSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
@ -31,44 +27,15 @@ public class JDBCUtils {
|
||||||
private static final Logger logger = Logger.getLogger(JDBCUtils.class);
|
private static final Logger logger = Logger.getLogger(JDBCUtils.class);
|
||||||
|
|
||||||
public static SQLProvider.Factory getSQLProviderFactory(String url) {
|
public static SQLProvider.Factory getSQLProviderFactory(String url) {
|
||||||
SQLProvider.Factory factory;
|
PropertySQLProvider.Factory.SQLDialect dialect = PropertySQLProvider.Factory.identifyDialect(url);
|
||||||
if (url.contains("derby")) {
|
logger.tracef("getSQLProvider Returning SQL provider for dialect %s for url::%s", dialect, url);
|
||||||
logger.tracef("getSQLProvider Returning Derby SQL provider for url::%s", url);
|
return new PropertySQLProvider.Factory(dialect);
|
||||||
factory = new DerbySQLProvider.Factory();
|
|
||||||
} else if (url.contains("postgres")) {
|
|
||||||
logger.tracef("getSQLProvider Returning postgres SQL provider for url::%s", url);
|
|
||||||
factory = new PostgresSQLProvider.Factory();
|
|
||||||
} else if (url.contains("mysql")) {
|
|
||||||
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
|
|
||||||
factory = new MySQLSQLProvider.Factory();
|
|
||||||
} else if (url.contains("oracle")) {
|
|
||||||
logger.tracef("getSQLProvider Returning Oracle12C SQL provider for url::%s", url);
|
|
||||||
factory = new Oracle12CSQLProvider.Factory();
|
|
||||||
} else {
|
|
||||||
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
|
|
||||||
factory = new GenericSQLProvider.Factory();
|
|
||||||
}
|
|
||||||
return factory;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SQLProvider getSQLProvider(String driverClass, String tableName, SQLProvider.DatabaseStoreType storeType) {
|
public static SQLProvider getSQLProvider(String driverClass, String tableName, SQLProvider.DatabaseStoreType storeType) {
|
||||||
SQLProvider.Factory factory;
|
PropertySQLProvider.Factory.SQLDialect dialect = PropertySQLProvider.Factory.identifyDialect(driverClass);
|
||||||
if (driverClass.contains("derby")) {
|
logger.tracef("getSQLProvider Returning SQL provider for dialect %s for driver::%s, tableName::%s", dialect, driverClass, tableName);
|
||||||
logger.tracef("getSQLProvider Returning Derby SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
PropertySQLProvider.Factory factory = new PropertySQLProvider.Factory(dialect);
|
||||||
factory = new DerbySQLProvider.Factory();
|
|
||||||
} else if (driverClass.contains("postgres")) {
|
|
||||||
logger.tracef("getSQLProvider Returning postgres SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
|
||||||
factory = new PostgresSQLProvider.Factory();
|
|
||||||
} else if (driverClass.contains("mysql")) {
|
|
||||||
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
|
||||||
factory = new MySQLSQLProvider.Factory();
|
|
||||||
} else if (driverClass.contains("oracle")) {
|
|
||||||
logger.tracef("getSQLProvider Returning Oracle12C SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
|
||||||
factory = new Oracle12CSQLProvider.Factory();
|
|
||||||
} else {
|
|
||||||
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
|
||||||
factory = new GenericSQLProvider.Factory();
|
|
||||||
}
|
|
||||||
return factory.create(tableName, storeType);
|
return factory.create(tableName, storeType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,59 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.jdbc.store.drivers.derby;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
|
||||||
|
|
||||||
public class DerbySQLProvider extends GenericSQLProvider {
|
|
||||||
|
|
||||||
// Derby max blob size = 2G
|
|
||||||
private static final int MAX_BLOB_SIZE = 2147483647;
|
|
||||||
|
|
||||||
private final String createFileTableSQL;
|
|
||||||
|
|
||||||
private DerbySQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
super(tableName.toUpperCase(), databaseStoreType);
|
|
||||||
|
|
||||||
createFileTableSQL = "CREATE TABLE " + tableName +
|
|
||||||
"(ID BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
|
|
||||||
"FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMaxBlobSize() {
|
|
||||||
return MAX_BLOB_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCreateFileTableSQL() {
|
|
||||||
return createFileTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean closeConnectionOnShutdown() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Factory implements SQLProvider.Factory {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
return new DerbySQLProvider(tableName, databaseStoreType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,75 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.jdbc.store.drivers.mysql;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
|
||||||
|
|
||||||
public class MySQLSQLProvider extends GenericSQLProvider {
|
|
||||||
|
|
||||||
private static final long MAX_BLOB_SIZE = 4 * 1024 * 1024 * 1024; // 4GB
|
|
||||||
|
|
||||||
private final String createFileTableSQL;
|
|
||||||
|
|
||||||
private final String[] createJournalTableSQL;
|
|
||||||
|
|
||||||
private final String copyFileRecordByIdSQL;
|
|
||||||
|
|
||||||
private MySQLSQLProvider(String tName, DatabaseStoreType databaseStoreType) {
|
|
||||||
super(tName.toLowerCase(), databaseStoreType);
|
|
||||||
|
|
||||||
createFileTableSQL = "CREATE TABLE " + tableName +
|
|
||||||
"(ID BIGINT NOT NULL AUTO_INCREMENT," +
|
|
||||||
"FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA LONGBLOB, PRIMARY KEY(ID)) ENGINE=InnoDB;";
|
|
||||||
|
|
||||||
createJournalTableSQL = new String[] {
|
|
||||||
"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record LONGBLOB,txDataSize INTEGER,txData LONGBLOB,txCheckNoRecords INTEGER,seq BIGINT) ENGINE=InnoDB;",
|
|
||||||
"CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"
|
|
||||||
};
|
|
||||||
|
|
||||||
copyFileRecordByIdSQL = " UPDATE " + tableName + ", (SELECT DATA AS FROM_DATA FROM " + tableName +
|
|
||||||
" WHERE id=?) SELECT_COPY SET DATA=FROM_DATA WHERE id=?;";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMaxBlobSize() {
|
|
||||||
return MAX_BLOB_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCreateFileTableSQL() {
|
|
||||||
return createFileTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String[] getCreateJournalTableSQL() {
|
|
||||||
return createJournalTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCopyFileRecordByIdSQL() {
|
|
||||||
return copyFileRecordByIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Factory implements SQLProvider.Factory {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
return new MySQLSQLProvider(tableName, databaseStoreType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,64 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.jdbc.store.drivers.oracle;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
|
||||||
|
|
||||||
public class Oracle12CSQLProvider extends GenericSQLProvider {
|
|
||||||
|
|
||||||
/**
|
|
||||||
create-file-table.oracle=CREATE TABLE %s(ID NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))
|
|
||||||
create-journal-table.oracle=CREATE TABLE %s(id NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY,recordType NUMBER(5),compactCount NUMBER(5),txId NUMBER(19),userRecordType NUMBER(5),variableSize NUMBER(10),record BLOB,txDataSize NUMBER(10),txData BLOB,txCheckNoRecords NUMBER(10),seq NUMBER(19))
|
|
||||||
max-blob-size.oracle=4294967296 **/
|
|
||||||
|
|
||||||
private final String createFileTableSQL = "CREATE TABLE " + tableName + " (ID NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
|
|
||||||
|
|
||||||
private final String createJournalTableSQL = "CREATE TABLE " + tableName + " (id NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY,recordType NUMBER(5),compactCount NUMBER(5),txId NUMBER(19),userRecordType NUMBER(5),variableSize NUMBER(10),record BLOB,txDataSize NUMBER(10),txData BLOB,txCheckNoRecords NUMBER(10),seq NUMBER(19))";
|
|
||||||
|
|
||||||
private static final long MAX_BLOB_SIZE = 4294967296L; //4GB
|
|
||||||
|
|
||||||
protected Oracle12CSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
super(tableName.toUpperCase(), databaseStoreType);
|
|
||||||
if (tableName.length() > 30) {
|
|
||||||
throw new RuntimeException("The maximum name size for the " + databaseStoreType.name().toLowerCase() + " store table, when using Oracle12C is 30 characters.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMaxBlobSize() {
|
|
||||||
return MAX_BLOB_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCreateFileTableSQL() {
|
|
||||||
return createFileTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String[] getCreateJournalTableSQL() {
|
|
||||||
return new String[] {createJournalTableSQL};
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Factory implements SQLProvider.Factory {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
return new Oracle12CSQLProvider(tableName, databaseStoreType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,65 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
|
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
|
||||||
|
|
||||||
public class PostgresSQLProvider extends GenericSQLProvider {
|
|
||||||
|
|
||||||
// BYTEA Size used in Journal
|
|
||||||
private static final long MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB
|
|
||||||
|
|
||||||
private final String createFileTableSQL;
|
|
||||||
|
|
||||||
private final String[] createJournalTableSQL;
|
|
||||||
|
|
||||||
private PostgresSQLProvider(String tName, DatabaseStoreType databaseStoreType) {
|
|
||||||
super(tName.toLowerCase(), databaseStoreType);
|
|
||||||
createFileTableSQL = "CREATE TABLE " + tableName +
|
|
||||||
"(ID BIGSERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
|
|
||||||
|
|
||||||
createJournalTableSQL = new String[] {
|
|
||||||
"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)",
|
|
||||||
"CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCreateFileTableSQL() {
|
|
||||||
return createFileTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String[] getCreateJournalTableSQL() {
|
|
||||||
return createJournalTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMaxBlobSize() {
|
|
||||||
return MAX_BLOB_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Factory implements SQLProvider.Factory {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SQLProvider create(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
return new PostgresSQLProvider(tableName, databaseStoreType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -21,16 +21,18 @@ import javax.sql.DataSource;
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.POSTGRESQL;
|
||||||
|
|
||||||
class JDBCFileUtils {
|
class JDBCFileUtils {
|
||||||
|
|
||||||
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
|
static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
|
||||||
String jdbcConnectionUrl,
|
String jdbcConnectionUrl,
|
||||||
SQLProvider provider) throws SQLException {
|
SQLProvider provider) throws SQLException {
|
||||||
final JDBCSequentialFileFactoryDriver dbDriver;
|
final JDBCSequentialFileFactoryDriver dbDriver;
|
||||||
if (provider instanceof PostgresSQLProvider) {
|
if (POSTGRESQL.equals(PropertySQLProvider.Factory.identifyDialect(driverClass))) {
|
||||||
dbDriver = new PostgresSequentialSequentialFileDriver();
|
dbDriver = new PostgresSequentialSequentialFileDriver();
|
||||||
} else {
|
} else {
|
||||||
dbDriver = new JDBCSequentialFileFactoryDriver();
|
dbDriver = new JDBCSequentialFileFactoryDriver();
|
||||||
|
@ -43,7 +45,7 @@ class JDBCFileUtils {
|
||||||
|
|
||||||
static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
|
static JDBCSequentialFileFactoryDriver getDBFileDriver(DataSource dataSource, SQLProvider provider) throws SQLException {
|
||||||
JDBCSequentialFileFactoryDriver dbDriver;
|
JDBCSequentialFileFactoryDriver dbDriver;
|
||||||
if (provider instanceof PostgresSQLProvider) {
|
if (POSTGRESQL.equals(PropertySQLProvider.Factory.investigateDialect(dataSource.getConnection()))) {
|
||||||
dbDriver = new PostgresSequentialSequentialFileDriver(dataSource, provider);
|
dbDriver = new PostgresSequentialSequentialFileDriver(dataSource, provider);
|
||||||
} else {
|
} else {
|
||||||
dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider);
|
dbDriver = new JDBCSequentialFileFactoryDriver(dataSource, provider);
|
||||||
|
@ -53,7 +55,7 @@ class JDBCFileUtils {
|
||||||
|
|
||||||
static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
|
static JDBCSequentialFileFactoryDriver getDBFileDriver(Connection connection, SQLProvider provider) throws SQLException {
|
||||||
JDBCSequentialFileFactoryDriver dbDriver;
|
JDBCSequentialFileFactoryDriver dbDriver;
|
||||||
if (provider instanceof PostgresSQLProvider) {
|
if (POSTGRESQL.equals(PropertySQLProvider.Factory.investigateDialect(connection))) {
|
||||||
dbDriver = new PostgresSequentialSequentialFileDriver(connection, provider);
|
dbDriver = new PostgresSequentialSequentialFileDriver(connection, provider);
|
||||||
dbDriver.setConnection(connection);
|
dbDriver.setConnection(connection);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,391 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.jdbc.store.sql;
|
|
||||||
|
|
||||||
public class GenericSQLProvider implements SQLProvider {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The JDBC Node Manager shared state is contained in these 4 rows: each one is used exclusively for a specific purpose.
|
|
||||||
*/
|
|
||||||
private static final int STATE_ROW_ID = 0;
|
|
||||||
private static final int LIVE_LOCK_ROW_ID = 1;
|
|
||||||
private static final int BACKUP_LOCK_ROW_ID = 2;
|
|
||||||
private static final int NODE_ID_ROW_ID = 3;
|
|
||||||
|
|
||||||
// Default to lowest (MYSQL = 64k)
|
|
||||||
private static final long MAX_BLOB_SIZE = 64512;
|
|
||||||
|
|
||||||
protected final String tableName;
|
|
||||||
|
|
||||||
private final String createFileTableSQL;
|
|
||||||
|
|
||||||
private final String insertFileSQL;
|
|
||||||
|
|
||||||
private final String selectFileNamesByExtensionSQL;
|
|
||||||
|
|
||||||
private final String selectIdByFileNameSQL;
|
|
||||||
|
|
||||||
private final String appendToFileSQL;
|
|
||||||
|
|
||||||
private final String readLargeObjectSQL;
|
|
||||||
|
|
||||||
private final String deleteFileSQL;
|
|
||||||
|
|
||||||
private final String updateFileNameByIdSQL;
|
|
||||||
|
|
||||||
private final String copyFileRecordByIdSQL;
|
|
||||||
|
|
||||||
private final String cloneFileRecordSQL;
|
|
||||||
|
|
||||||
private final String dropFileTableSQL;
|
|
||||||
|
|
||||||
private final String[] createJournalTableSQL;
|
|
||||||
|
|
||||||
private final String insertJournalRecordsSQL;
|
|
||||||
|
|
||||||
private final String selectJournalRecordsSQL;
|
|
||||||
|
|
||||||
private final String deleteJournalRecordsSQL;
|
|
||||||
|
|
||||||
private final String deleteJournalTxRecordsSQL;
|
|
||||||
|
|
||||||
private final String countJournalRecordsSQL;
|
|
||||||
|
|
||||||
private final String createNodeManagerStoreTableSQL;
|
|
||||||
|
|
||||||
private final String createStateSQL;
|
|
||||||
|
|
||||||
private final String createNodeIdSQL;
|
|
||||||
|
|
||||||
private final String createLiveLockSQL;
|
|
||||||
|
|
||||||
private final String createBackupLockSQL;
|
|
||||||
|
|
||||||
private final String tryAcquireLiveLockSQL;
|
|
||||||
|
|
||||||
private final String tryAcquireBackupLockSQL;
|
|
||||||
|
|
||||||
private final String tryReleaseLiveLockSQL;
|
|
||||||
|
|
||||||
private final String tryReleaseBackupLockSQL;
|
|
||||||
|
|
||||||
private final String isLiveLockedSQL;
|
|
||||||
|
|
||||||
private final String isBackupLockedSQL;
|
|
||||||
|
|
||||||
private final String renewLiveLockSQL;
|
|
||||||
|
|
||||||
private final String renewBackupLockSQL;
|
|
||||||
|
|
||||||
private final String currentTimestampSQL;
|
|
||||||
|
|
||||||
private final String writeStateSQL;
|
|
||||||
|
|
||||||
private final String readStateSQL;
|
|
||||||
|
|
||||||
private final String writeNodeIdSQL;
|
|
||||||
|
|
||||||
private final String initializeNodeIdSQL;
|
|
||||||
|
|
||||||
private final String readNodeIdSQL;
|
|
||||||
|
|
||||||
protected final DatabaseStoreType databaseStoreType;
|
|
||||||
|
|
||||||
protected GenericSQLProvider(String tableName, DatabaseStoreType databaseStoreType) {
|
|
||||||
this.tableName = tableName;
|
|
||||||
|
|
||||||
this.databaseStoreType = databaseStoreType;
|
|
||||||
|
|
||||||
createFileTableSQL = "CREATE TABLE " + tableName + "(ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
|
|
||||||
|
|
||||||
insertFileSQL = "INSERT INTO " + tableName + " (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
|
|
||||||
|
|
||||||
selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
|
|
||||||
|
|
||||||
selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
|
|
||||||
|
|
||||||
appendToFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=? FOR UPDATE";
|
|
||||||
|
|
||||||
readLargeObjectSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
|
|
||||||
|
|
||||||
deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
|
|
||||||
|
|
||||||
updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
|
|
||||||
|
|
||||||
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " + "(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
|
|
||||||
|
|
||||||
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
|
|
||||||
|
|
||||||
dropFileTableSQL = "DROP TABLE " + tableName;
|
|
||||||
|
|
||||||
createJournalTableSQL = new String[]{"CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))", "CREATE INDEX " + tableName + "_IDX ON " + tableName + " (id)"};
|
|
||||||
|
|
||||||
insertJournalRecordsSQL = "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
|
|
||||||
|
|
||||||
selectJournalRecordsSQL = "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq " + "FROM " + tableName + " ORDER BY seq ASC";
|
|
||||||
|
|
||||||
deleteJournalRecordsSQL = "DELETE FROM " + tableName + " WHERE id = ?";
|
|
||||||
|
|
||||||
deleteJournalTxRecordsSQL = "DELETE FROM " + tableName + " WHERE txId=?";
|
|
||||||
|
|
||||||
countJournalRecordsSQL = "SELECT COUNT(*) FROM " + tableName;
|
|
||||||
|
|
||||||
createNodeManagerStoreTableSQL = "CREATE TABLE " + tableName + " ( ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))";
|
|
||||||
|
|
||||||
createStateSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + STATE_ROW_ID + ")";
|
|
||||||
|
|
||||||
createNodeIdSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + NODE_ID_ROW_ID + ")";
|
|
||||||
|
|
||||||
createLiveLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + LIVE_LOCK_ROW_ID + ")";
|
|
||||||
|
|
||||||
createBackupLockSQL = "INSERT INTO " + tableName + " (ID) VALUES (" + BACKUP_LOCK_ROW_ID + ")";
|
|
||||||
|
|
||||||
tryAcquireLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + LIVE_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
tryAcquireBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = " + BACKUP_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
tryReleaseLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
tryReleaseBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
isLiveLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + LIVE_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
isBackupLockedSQL = "SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM " + tableName + " WHERE ID = " + BACKUP_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
renewLiveLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + LIVE_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
renewBackupLockSQL = "UPDATE " + tableName + " SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = " + BACKUP_LOCK_ROW_ID;
|
|
||||||
|
|
||||||
currentTimestampSQL = "SELECT CURRENT_TIMESTAMP FROM " + tableName;
|
|
||||||
|
|
||||||
writeStateSQL = "UPDATE " + tableName + " SET STATE = ? WHERE ID = " + STATE_ROW_ID;
|
|
||||||
|
|
||||||
readStateSQL = "SELECT STATE FROM " + tableName + " WHERE ID = " + STATE_ROW_ID;
|
|
||||||
|
|
||||||
writeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE ID = " + NODE_ID_ROW_ID;
|
|
||||||
|
|
||||||
initializeNodeIdSQL = "UPDATE " + tableName + " SET NODE_ID = ? WHERE NODE_ID IS NULL AND ID = " + NODE_ID_ROW_ID;
|
|
||||||
|
|
||||||
readNodeIdSQL = "SELECT NODE_ID FROM " + tableName + " WHERE ID = " + NODE_ID_ROW_ID;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getMaxBlobSize() {
|
|
||||||
return MAX_BLOB_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getTableName() {
|
|
||||||
return tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Journal SQL Statements
|
|
||||||
@Override
|
|
||||||
public String[] getCreateJournalTableSQL() {
|
|
||||||
return createJournalTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getInsertJournalRecordsSQL() {
|
|
||||||
return insertJournalRecordsSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getSelectJournalRecordsSQL() {
|
|
||||||
return selectJournalRecordsSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDeleteJournalRecordsSQL() {
|
|
||||||
return deleteJournalRecordsSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDeleteJournalTxRecordsSQL() {
|
|
||||||
return deleteJournalTxRecordsSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCountJournalRecordsSQL() {
|
|
||||||
return countJournalRecordsSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Large Message Statements
|
|
||||||
@Override
|
|
||||||
public String getCreateFileTableSQL() {
|
|
||||||
return createFileTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getInsertFileSQL() {
|
|
||||||
return insertFileSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getSelectFileByFileName() {
|
|
||||||
return selectIdByFileNameSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getSelectFileNamesByExtensionSQL() {
|
|
||||||
return selectFileNamesByExtensionSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getAppendToLargeObjectSQL() {
|
|
||||||
return appendToFileSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getReadLargeObjectSQL() {
|
|
||||||
return readLargeObjectSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDeleteFileSQL() {
|
|
||||||
return deleteFileSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getUpdateFileNameByIdSQL() {
|
|
||||||
return updateFileNameByIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCopyFileRecordByIdSQL() {
|
|
||||||
return copyFileRecordByIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getCloneFileRecordByIdSQL() {
|
|
||||||
return cloneFileRecordSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getDropFileTableSQL() {
|
|
||||||
return dropFileTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String createNodeManagerStoreTableSQL() {
|
|
||||||
return createNodeManagerStoreTableSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String createStateSQL() {
|
|
||||||
return createStateSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String createNodeIdSQL() {
|
|
||||||
return createNodeIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String createLiveLockSQL() {
|
|
||||||
return createLiveLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String createBackupLockSQL() {
|
|
||||||
return createBackupLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String tryAcquireLiveLockSQL() {
|
|
||||||
return tryAcquireLiveLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String tryAcquireBackupLockSQL() {
|
|
||||||
return tryAcquireBackupLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String tryReleaseLiveLockSQL() {
|
|
||||||
return tryReleaseLiveLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String tryReleaseBackupLockSQL() {
|
|
||||||
return tryReleaseBackupLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String isLiveLockedSQL() {
|
|
||||||
return isLiveLockedSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String isBackupLockedSQL() {
|
|
||||||
return isBackupLockedSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String renewLiveLockSQL() {
|
|
||||||
return renewLiveLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String renewBackupLockSQL() {
|
|
||||||
return renewBackupLockSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String currentTimestampSQL() {
|
|
||||||
return currentTimestampSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String writeStateSQL() {
|
|
||||||
return writeStateSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String readStateSQL() {
|
|
||||||
return readStateSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String writeNodeIdSQL() {
|
|
||||||
return writeNodeIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String readNodeIdSQL() {
|
|
||||||
return readNodeIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String initializeNodeIdSQL() {
|
|
||||||
return initializeNodeIdSQL;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean closeConnectionOnShutdown() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class Factory implements SQLProvider.Factory {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SQLProvider create(String tableName, DatabaseStoreType storeType) {
|
|
||||||
return new GenericSQLProvider(tableName, storeType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.jdbc.store.sql;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.ORACLE;
|
||||||
|
|
||||||
|
class Oracle12CSQLProvider extends PropertySQLProvider {
|
||||||
|
|
||||||
|
Oracle12CSQLProvider(String tableName, Properties sqlProperties, DatabaseStoreType storeType) {
|
||||||
|
super(ORACLE, tableName, sqlProperties);
|
||||||
|
if (getTableName().length() > 30) {
|
||||||
|
throw new RuntimeException("The maximum name size for the " + storeType.name().toLowerCase() + " store table, when using Oracle12C is 30 characters.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,408 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.jdbc.store.sql;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DatabaseMetaData;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
import static java.lang.String.format;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Property-based implementation of a {@link SQLProvider}'s factory.
|
||||||
|
*
|
||||||
|
* Properties are stored in a journal-sql.properties.
|
||||||
|
*
|
||||||
|
* Dialects specific to a database can be customized by suffixing the property keys with the name of the dialect.
|
||||||
|
*/
|
||||||
|
public class PropertySQLProvider implements SQLProvider {
|
||||||
|
|
||||||
|
private enum LetterCase implements Function<String, String> {
|
||||||
|
upper(String::toUpperCase),
|
||||||
|
lower(String::toLowerCase),
|
||||||
|
none(Function.identity());
|
||||||
|
|
||||||
|
private final Function<String, String> transform;
|
||||||
|
|
||||||
|
LetterCase(Function<String, String> transform) {
|
||||||
|
this.transform = transform;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String apply(String s) {
|
||||||
|
return transform.apply(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static LetterCase parse(String value) {
|
||||||
|
return LetterCase.valueOf(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final int STATE_ROW_ID = 0;
|
||||||
|
private static final int LIVE_LOCK_ROW_ID = 1;
|
||||||
|
private static final int BACKUP_LOCK_ROW_ID = 2;
|
||||||
|
private static final int NODE_ID_ROW_ID = 3;
|
||||||
|
|
||||||
|
private final String tableName;
|
||||||
|
private final Factory.SQLDialect dialect;
|
||||||
|
private volatile Properties sql;
|
||||||
|
|
||||||
|
protected PropertySQLProvider(Factory.SQLDialect dialect, String tableName, Properties sqlProperties) {
|
||||||
|
this.dialect = dialect;
|
||||||
|
this.sql = sqlProperties;
|
||||||
|
final LetterCase tableNamesCase = LetterCase.parse(sql("table-names-case", dialect, sqlProperties));
|
||||||
|
this.tableName = tableNamesCase.apply(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMaxBlobSize() {
|
||||||
|
return Long.valueOf(sql("max-blob-size"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getCreateJournalTableSQL() {
|
||||||
|
return new String[] {
|
||||||
|
format(sql("create-journal-table"), tableName),
|
||||||
|
format(sql("create-journal-index"), tableName),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getInsertJournalRecordsSQL() {
|
||||||
|
return format(sql("insert-journal-record"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSelectJournalRecordsSQL() {
|
||||||
|
return format(sql("select-journal-record"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDeleteJournalRecordsSQL() {
|
||||||
|
return format(sql("delete-journal-record"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDeleteJournalTxRecordsSQL() {
|
||||||
|
return format(sql("delete-journal-tx-record"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTableName() {
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCreateFileTableSQL() {
|
||||||
|
return format(sql("create-file-table"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getInsertFileSQL() {
|
||||||
|
return format(sql("insert-file"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSelectFileNamesByExtensionSQL() {
|
||||||
|
return format(sql("select-filenames-by-extension"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getSelectFileByFileName() {
|
||||||
|
return format(sql("select-file-by-filename"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getAppendToLargeObjectSQL() {
|
||||||
|
return format(sql("append-to-file"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getReadLargeObjectSQL() {
|
||||||
|
return format(sql("read-large-object"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDeleteFileSQL() {
|
||||||
|
return format(sql("delete-file"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getUpdateFileNameByIdSQL() {
|
||||||
|
return format(sql("update-filename-by-id"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCopyFileRecordByIdSQL() {
|
||||||
|
return format(sql("copy-file-record-by-id"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDropFileTableSQL() {
|
||||||
|
return format(sql("drop-table"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCloneFileRecordByIdSQL() {
|
||||||
|
return format(sql("clone-file-record"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCountJournalRecordsSQL() {
|
||||||
|
return format(sql("count-journal-record"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean closeConnectionOnShutdown() {
|
||||||
|
return Boolean.valueOf(sql("close-connection-on-shutdown"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String createNodeManagerStoreTableSQL() {
|
||||||
|
return format(sql("create-node-manager-store-table"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String createStateSQL() {
|
||||||
|
return format(sql("create-state"), tableName, STATE_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String createNodeIdSQL() {
|
||||||
|
return format(sql("create-state"), tableName, NODE_ID_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String createLiveLockSQL() {
|
||||||
|
return format(sql("create-state"), tableName, LIVE_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String createBackupLockSQL() {
|
||||||
|
return format(sql("create-state"), tableName, BACKUP_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String tryAcquireLiveLockSQL() {
|
||||||
|
return format(sql("try-acquire-lock"), tableName, LIVE_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String tryAcquireBackupLockSQL() {
|
||||||
|
return format(sql("try-acquire-lock"), tableName, BACKUP_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String tryReleaseLiveLockSQL() {
|
||||||
|
return format(sql("try-release-lock"), tableName, LIVE_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String tryReleaseBackupLockSQL() {
|
||||||
|
return format(sql("try-release-lock"), tableName, BACKUP_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String isLiveLockedSQL() {
|
||||||
|
return format(sql("is-locked"), tableName, LIVE_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String isBackupLockedSQL() {
|
||||||
|
return format(sql("is-locked"), tableName, BACKUP_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String renewLiveLockSQL() {
|
||||||
|
return format(sql("renew-lock"), tableName, LIVE_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String renewBackupLockSQL() {
|
||||||
|
return format(sql("renew-lock"), tableName, BACKUP_LOCK_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String currentTimestampSQL() {
|
||||||
|
return format(sql("current-timestamp"), tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String writeStateSQL() {
|
||||||
|
return format(sql("write-state"), tableName, STATE_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String readStateSQL() {
|
||||||
|
return format(sql("read-state"), tableName, STATE_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String writeNodeIdSQL() {
|
||||||
|
return format(sql("write-nodeId"), tableName, NODE_ID_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String readNodeIdSQL() {
|
||||||
|
return format(sql("read-nodeId"), tableName, NODE_ID_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String initializeNodeIdSQL() {
|
||||||
|
return format(sql("initialize-nodeId"), tableName, NODE_ID_ROW_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String sql(final String key) {
|
||||||
|
return sql(key, dialect, sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String sql(final String key, final Factory.SQLDialect dialect, final Properties sql) {
|
||||||
|
if (dialect != null) {
|
||||||
|
String result = sql.getProperty(key + "." + dialect.getKey());
|
||||||
|
if (result != null) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String result = sql.getProperty(key);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class Factory implements SQLProvider.Factory {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(JDBCJournalImpl.class);
|
||||||
|
private static final String SQL_PROPERTIES_FILE = "journal-sql.properties";
|
||||||
|
// can be null if no known dialect has been identified
|
||||||
|
private SQLDialect dialect;
|
||||||
|
private final Properties sql;
|
||||||
|
|
||||||
|
public enum SQLDialect {
|
||||||
|
ORACLE("oracle", "oracle"),
|
||||||
|
POSTGRESQL("postgresql", "postgres"),
|
||||||
|
DERBY("derby", "derby"),
|
||||||
|
MYSQL("mysql", "mysql"),
|
||||||
|
DB2("db2", "db2"),
|
||||||
|
HSQL("hsql", "hsql", "hypersonic"),
|
||||||
|
H2("h2", "h2"),
|
||||||
|
MSSQL("mssql", "microsoft"),
|
||||||
|
SYBASE("jconnect", "jconnect");
|
||||||
|
|
||||||
|
private final String key;
|
||||||
|
private final String[] driverKeys;
|
||||||
|
|
||||||
|
SQLDialect(String key, String... driverKeys) {
|
||||||
|
this.key = key;
|
||||||
|
this.driverKeys = driverKeys;
|
||||||
|
}
|
||||||
|
|
||||||
|
String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean match(String driverName) {
|
||||||
|
for (String driverKey : driverKeys) {
|
||||||
|
if (driverName.contains(driverKey)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return null if no known dialect has been identified.
|
||||||
|
*/
|
||||||
|
public static SQLDialect identifyDialect(String name) {
|
||||||
|
if (name == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
//use a lower case name to make it more resilient
|
||||||
|
final String lowerCaseName = name.toLowerCase();
|
||||||
|
return Stream.of(SQLDialect.values())
|
||||||
|
.filter(dialect -> dialect.match(lowerCaseName))
|
||||||
|
.findFirst()
|
||||||
|
.orElse(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Factory(SQLDialect dialect) {
|
||||||
|
this.dialect = dialect;
|
||||||
|
try (InputStream stream = PropertySQLProvider.class.getClassLoader().getResourceAsStream(SQL_PROPERTIES_FILE)) {
|
||||||
|
sql = new Properties();
|
||||||
|
sql.load(stream);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Unable to load properties from " + SQL_PROPERTIES_FILE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Factory(DataSource dataSource) {
|
||||||
|
this(investigateDialect(dataSource));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SQLDialect investigateDialect(Connection connection) {
|
||||||
|
SQLDialect dialect = null;
|
||||||
|
try {
|
||||||
|
DatabaseMetaData metaData = connection.getMetaData();
|
||||||
|
String dbProduct = metaData.getDatabaseProductName();
|
||||||
|
dialect = identifyDialect(dbProduct);
|
||||||
|
|
||||||
|
if (dialect == null) {
|
||||||
|
logger.debug("Attempting to guess on driver name.");
|
||||||
|
dialect = identifyDialect(metaData.getDriverName());
|
||||||
|
}
|
||||||
|
if (dialect == null) {
|
||||||
|
logger.warnf("Unable to detect database dialect from connection metadata or JDBC driver name.");
|
||||||
|
} else {
|
||||||
|
logger.debugf("Detect database dialect as '%s'.");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.debug("Unable to read JDBC metadata.", e);
|
||||||
|
}
|
||||||
|
return dialect;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SQLDialect investigateDialect(DataSource dataSource) {
|
||||||
|
try (Connection connection = dataSource.getConnection()) {
|
||||||
|
return investigateDialect(connection);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.debug("Unable to read JDBC metadata.", e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SQLDialect identifyDialect(String name) {
|
||||||
|
return SQLDialect.identifyDialect(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SQLProvider create(String tableName, DatabaseStoreType dbStoreType) {
|
||||||
|
if (dialect == SQLDialect.ORACLE) {
|
||||||
|
return new Oracle12CSQLProvider(tableName, sql, dbStoreType);
|
||||||
|
} else {
|
||||||
|
return new PropertySQLProvider(dialect, tableName, sql);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
# Generic SQL statements
|
||||||
|
|
||||||
|
create-file-table=CREATE TABLE %s (ID BIGINT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))
|
||||||
|
insert-file=INSERT INTO %s (FILENAME, EXTENSION, DATA) VALUES (?,?,?)
|
||||||
|
select-filenames-by-extension=SELECT FILENAME, ID FROM %s WHERE EXTENSION=?
|
||||||
|
select-file-by-filename=SELECT ID, FILENAME, EXTENSION, DATA FROM %s WHERE fileName=?
|
||||||
|
append-to-file=SELECT DATA FROM %s WHERE ID=? FOR UPDATE
|
||||||
|
read-large-object=SELECT DATA FROM %s WHERE ID=?
|
||||||
|
delete-file=DELETE FROM %s WHERE ID=?
|
||||||
|
update-filename-by-id=UPDATE %s SET FILENAME=? WHERE ID=?
|
||||||
|
clone-file-record=INSERT INTO %s (FILENAME, EXTENSION, DATA) (SELECT FILENAME, EXTENSION, DATA FROM %s WHERE ID=?)
|
||||||
|
copy-file-record-by-id=UPDATE %1$s SET DATA = (SELECT DATA FROM %1$s WHERE ID=?) WHERE ID=?
|
||||||
|
drop-table=DROP TABLE %s
|
||||||
|
create-journal-table=CREATE TABLE %s(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData BLOB,txCheckNoRecords INTEGER,seq BIGINT NOT NULL, PRIMARY KEY(seq))
|
||||||
|
create-journal-index=CREATE INDEX %1$s_IDX ON %1$s (id)
|
||||||
|
insert-journal-record=INSERT INTO %s(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq) VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
||||||
|
select-journal-record=SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq FROM %s ORDER BY seq ASC
|
||||||
|
delete-journal-record=DELETE FROM %s WHERE id = ?
|
||||||
|
delete-journal-tx-record=DELETE FROM %s WHERE txId=?
|
||||||
|
count-journal-record=SELECT COUNT(*) FROM %s
|
||||||
|
|
||||||
|
create-node-manager-store-table=CREATE TABLE %s (ID INT NOT NULL, HOLDER_ID VARCHAR(128), HOLDER_EXPIRATION_TIME TIMESTAMP, NODE_ID CHAR(36),STATE CHAR(1), PRIMARY KEY(ID))
|
||||||
|
create-state=INSERT INTO %s (ID) VALUES (%s)
|
||||||
|
try-acquire-lock=UPDATE %s SET HOLDER_ID = ?, HOLDER_EXPIRATION_TIME = ? WHERE (HOLDER_EXPIRATION_TIME IS NULL OR HOLDER_EXPIRATION_TIME < CURRENT_TIMESTAMP) AND ID = %s
|
||||||
|
try-release-lock=UPDATE %s SET HOLDER_ID = NULL, HOLDER_EXPIRATION_TIME = NULL WHERE HOLDER_ID = ? AND ID = %s
|
||||||
|
is-locked=SELECT HOLDER_ID, HOLDER_EXPIRATION_TIME FROM %s WHERE ID = %s
|
||||||
|
renew-lock=UPDATE %s SET HOLDER_EXPIRATION_TIME = ? WHERE HOLDER_ID = ? AND ID = %s
|
||||||
|
current-timestamp=SELECT CURRENT_TIMESTAMP FROM %s
|
||||||
|
write-state=UPDATE %s SET STATE = ? WHERE ID = %s
|
||||||
|
read-state=SELECT STATE FROM %s WHERE ID = %s
|
||||||
|
write-nodeId=UPDATE %s SET NODE_ID = ? WHERE ID = %s
|
||||||
|
read-nodeId=SELECT NODE_ID FROM %s WHERE ID = %s
|
||||||
|
initialize-nodeId=UPDATE %s SET NODE_ID = ? WHERE NODE_ID IS NULL AND ID = %s
|
||||||
|
|
||||||
|
# in KiB
|
||||||
|
max-blob-size=64512
|
||||||
|
close-connection-on-shutdown=true
|
||||||
|
# none|lower|upper
|
||||||
|
table-names-case=none
|
||||||
|
|
||||||
|
# Derby SQL statements
|
||||||
|
close-connection-on-shutdown.derby=false
|
||||||
|
create-file-table.derby=CREATE TABLE %s (ID BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))
|
||||||
|
max-blob-size.derby=2147483647
|
||||||
|
table-names-case.derby=upper
|
||||||
|
|
||||||
|
# PostgreSQL SQL statements
|
||||||
|
create-file-table.postgresql=CREATE TABLE %s (ID BIGSERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))
|
||||||
|
create-journal-table.postgresql=CREATE TABLE %s(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)
|
||||||
|
|
||||||
|
# 1 GiB
|
||||||
|
max-blob-size.postgresql=1073741824
|
||||||
|
table-names-case.postgresql=lower
|
||||||
|
|
||||||
|
# MySQL SQL statements
|
||||||
|
create-file-table.mysql=CREATE TABLE %s(ID BIGINT NOT NULL AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA LONGBLOB, PRIMARY KEY(ID)) ENGINE=InnoDB
|
||||||
|
create-journal-table.mysql=CREATE TABLE %s(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record LONGBLOB,txDataSize INTEGER,txData LONGBLOB,txCheckNoRecords INTEGER,seq BIGINT) ENGINE=InnoDB
|
||||||
|
copy-file-record-by-id.mysql=UPDATE %1$s, (SELECT DATA AS FROM_DATA FROM %1$s WHERE id=?) SELECT_COPY SET DATA=FROM_DATA WHERE id=?
|
||||||
|
# 4 GiB
|
||||||
|
max-blob-size.mysql=4294967296
|
||||||
|
table-names-case.mysql=lower
|
||||||
|
|
||||||
|
# Oracle SQL statements
|
||||||
|
create-file-table.oracle=CREATE TABLE %s(ID NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))
|
||||||
|
create-journal-table.oracle=CREATE TABLE %s(id NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY,recordType NUMBER(5),compactCount NUMBER(5),txId NUMBER(19),userRecordType NUMBER(5),variableSize NUMBER(10),record BLOB,txDataSize NUMBER(10),txData BLOB,txCheckNoRecords NUMBER(10),seq NUMBER(19))
|
||||||
|
|
||||||
|
# 4 GiB
|
||||||
|
max-blob-size.oracle=4294967296
|
||||||
|
table-names-case.oracle=upper
|
|
@ -42,7 +42,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.jms.persistence.JMSStorageManager;
|
import org.apache.activemq.artemis.jms.persistence.JMSStorageManager;
|
||||||
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
|
import org.apache.activemq.artemis.jms.persistence.config.PersistedBindings;
|
||||||
|
@ -106,7 +106,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
|
||||||
if (dbConf.getDataSource() != null) {
|
if (dbConf.getDataSource() != null) {
|
||||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
|
||||||
}
|
}
|
||||||
localJMS = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener);
|
localJMS = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getJMSBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, ioExecutors.getExecutor(), criticalErrorListener);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
|
* The {@link SQLProvider.Factory} used to communicate with the JDBC data store.
|
||||||
* It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider.Factory}} will be used,
|
* It can be {@code null}. If the value is {@code null} and {@code dataSource} is set, the {@code {@link org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory}} will be used,
|
||||||
* else the type of the factory will be determined based on the {@code jdbcDriverClassName}.
|
* else the type of the factory will be determined based on the {@code jdbcDriverClassName}.
|
||||||
*
|
*
|
||||||
* @return the factory used to communicate with the JDBC data store.
|
* @return the factory used to communicate with the JDBC data store.
|
||||||
|
|
|
@ -42,7 +42,7 @@ import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
if (dbConf.getDataSource() != null) {
|
if (dbConf.getDataSource() != null) {
|
||||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
|
||||||
}
|
}
|
||||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(pageStoreTableNamePrefix, SQLProvider.DatabaseStoreType.PAGE), executorFactory.getExecutor(), criticalErrorListener);
|
||||||
} else {
|
} else {
|
||||||
|
@ -221,9 +221,14 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
||||||
if (writeToDirectory) directoryList.write(buffer, true);
|
if (writeToDirectory) directoryList.write(buffer, true);
|
||||||
directoryList.close();
|
directoryList.close();
|
||||||
|
|
||||||
SQLProvider sqlProvider = null;
|
final SQLProvider sqlProvider;
|
||||||
if (dbConf.getDataSource() != null) {
|
if (dbConf.getDataSource() != null) {
|
||||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory() == null ? new GenericSQLProvider.Factory() : dbConf.getSqlProviderFactory();
|
final SQLProvider.Factory sqlProviderFactory;
|
||||||
|
if (dbConf.getSqlProviderFactory() != null) {
|
||||||
|
sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||||
|
} else {
|
||||||
|
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
|
||||||
|
}
|
||||||
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
||||||
} else {
|
} else {
|
||||||
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName), SQLProvider.DatabaseStoreType.PAGE);
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||||
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
||||||
if (dbConf.getDataSource() != null) {
|
if (dbConf.getDataSource() != null) {
|
||||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||||
if (sqlProviderFactory == null) {
|
if (sqlProviderFactory == null) {
|
||||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
sqlProviderFactory = new PropertySQLProvider.Factory(dbConf.getDataSource());
|
||||||
}
|
}
|
||||||
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
|
||||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.UUID;
|
import org.apache.activemq.artemis.utils.UUID;
|
||||||
|
@ -58,7 +58,12 @@ public final class JdbcNodeManager extends NodeManager {
|
||||||
ExecutorFactory executorFactory,
|
ExecutorFactory executorFactory,
|
||||||
IOCriticalErrorListener ioCriticalErrorListener) {
|
IOCriticalErrorListener ioCriticalErrorListener) {
|
||||||
if (configuration.getDataSource() != null) {
|
if (configuration.getDataSource() != null) {
|
||||||
final SQLProvider.Factory sqlProviderFactory = configuration.getSqlProviderFactory() == null ? new GenericSQLProvider.Factory() : configuration.getSqlProviderFactory();
|
final SQLProvider.Factory sqlProviderFactory;
|
||||||
|
if (configuration.getSqlProviderFactory() != null) {
|
||||||
|
sqlProviderFactory = configuration.getSqlProviderFactory();
|
||||||
|
} else {
|
||||||
|
sqlProviderFactory = new PropertySQLProvider.Factory(configuration.getDataSource());
|
||||||
|
}
|
||||||
final String brokerId = java.util.UUID.randomUUID().toString();
|
final String brokerId = java.util.UUID.randomUUID().toString();
|
||||||
return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
|
return usingDataSource(brokerId, configuration.getJdbcLockExpirationMillis(), configuration.getJdbcLockRenewPeriodMillis(), configuration.getJdbcLockAcquisitionTimeoutMillis(), configuration.getDataSource(), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, executorFactory, ioCriticalErrorListener);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -20,11 +20,13 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.oracle.Oracle12CSQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.ORACLE;
|
||||||
|
|
||||||
public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
|
public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -39,7 +41,7 @@ public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
|
||||||
for (SQLProvider.DatabaseStoreType storeType : SQLProvider.DatabaseStoreType.values()) {
|
for (SQLProvider.DatabaseStoreType storeType : SQLProvider.DatabaseStoreType.values()) {
|
||||||
Throwable rte = null;
|
Throwable rte = null;
|
||||||
try {
|
try {
|
||||||
new Oracle12CSQLProvider.Factory().create("_A_TABLE_NAME_THAT_IS_TOO_LONG_", storeType);
|
new PropertySQLProvider.Factory(ORACLE).create("_A_TABLE_NAME_THAT_IS_TOO_LONG_", storeType);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
rte = t;
|
rte = t;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,17 +26,19 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
|
||||||
|
|
||||||
public class JdbcLeaseLockTest {
|
public class JdbcLeaseLockTest {
|
||||||
|
|
||||||
private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
|
private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
|
||||||
private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
|
private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
|
||||||
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
|
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
|
||||||
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
|
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
|
||||||
private JdbcSharedStateManager jdbcSharedStateManager;
|
private JdbcSharedStateManager jdbcSharedStateManager;
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||||
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
|
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
|
||||||
|
@ -50,13 +50,15 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
|
||||||
|
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class NettyFailoverTest extends FailoverTest {
|
public class NettyFailoverTest extends FailoverTest {
|
||||||
|
|
||||||
private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
|
private static final long JDBC_LOCK_EXPIRATION_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis();
|
||||||
private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
|
private static final long JDBC_LOCK_RENEW_PERIOD_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis();
|
||||||
private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
private static final long JDBC_LOCK_ACQUISITION_TIMEOUT_MILLIS = ActiveMQDefaultConfiguration.getDefaultJdbcLockAcquisitionTimeoutMillis();
|
||||||
private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
|
private static final SQLProvider SQL_PROVIDER = new PropertySQLProvider.Factory(DERBY).create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
|
||||||
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
|
private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
|
||||||
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
|
private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,8 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
import org.apache.activemq.artemis.core.journal.IOCompletion;
|
||||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||||
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
import org.apache.activemq.artemis.core.journal.RecordInfo;
|
||||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
|
||||||
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
|
||||||
|
import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
|
||||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||||
|
@ -41,6 +41,8 @@ import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
|
||||||
|
|
||||||
public class JDBCJournalTest extends ActiveMQTestBase {
|
public class JDBCJournalTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
@ -78,7 +80,7 @@ public class JDBCJournalTest extends ActiveMQTestBase {
|
||||||
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
|
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
|
||||||
executorService = Executors.newSingleThreadExecutor();
|
executorService = Executors.newSingleThreadExecutor();
|
||||||
jdbcUrl = "jdbc:derby:target/data;create=true";
|
jdbcUrl = "jdbc:derby:target/data;create=true";
|
||||||
SQLProvider.Factory factory = new DerbySQLProvider.Factory();
|
SQLProvider.Factory factory = new PropertySQLProvider.Factory(DERBY);
|
||||||
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
|
journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
|
||||||
@Override
|
@Override
|
||||||
public void onIOException(Throwable code, String message, SequentialFile file) {
|
public void onIOException(Throwable code, String message, SequentialFile file) {
|
||||||
|
|
Loading…
Reference in New Issue