This closes #1050
This commit is contained in:
commit
c71063d2ac
|
@ -20,6 +20,7 @@ import java.sql.SQLException;
|
|||
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.mysql.MySQLSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.oracle.Oracle12CSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
|
@ -40,6 +41,9 @@ public class JDBCUtils {
|
|||
} else if (url.contains("mysql")) {
|
||||
logger.tracef("getSQLProvider Returning mysql SQL provider for url::%s", url);
|
||||
factory = new MySQLSQLProvider.Factory();
|
||||
} else if (url.contains("oracle")) {
|
||||
logger.tracef("getSQLProvider Returning Oracle12C SQL provider for url::%s", url);
|
||||
factory = new Oracle12CSQLProvider.Factory();
|
||||
} else {
|
||||
logger.tracef("getSQLProvider Returning generic SQL provider for url::%s", url);
|
||||
factory = new GenericSQLProvider.Factory();
|
||||
|
@ -58,6 +62,9 @@ public class JDBCUtils {
|
|||
} else if (driverClass.contains("mysql")) {
|
||||
logger.tracef("getSQLProvider Returning mysql SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new MySQLSQLProvider.Factory();
|
||||
} else if (driverClass.contains("oracle")) {
|
||||
logger.tracef("getSQLProvider Returning Oracle12C SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new Oracle12CSQLProvider.Factory();
|
||||
} else {
|
||||
logger.tracef("getSQLProvider Returning generic SQL provider for driver::%s, tableName::%s", driverClass, tableName);
|
||||
factory = new GenericSQLProvider.Factory();
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jdbc.store.drivers.oracle;
|
||||
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
|
||||
public class Oracle12CSQLProvider extends GenericSQLProvider {
|
||||
|
||||
/**
|
||||
create-file-table.oracle=CREATE TABLE %s(ID NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))
|
||||
create-journal-table.oracle=CREATE TABLE %s(id NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY,recordType NUMBER(5),compactCount NUMBER(5),txId NUMBER(19),userRecordType NUMBER(5),variableSize NUMBER(10),record BLOB,txDataSize NUMBER(10),txData BLOB,txCheckNoRecords NUMBER(10),seq NUMBER(19))
|
||||
max-blob-size.oracle=4294967296 **/
|
||||
|
||||
private final String createFileTableSQL = "CREATE TABLE " + tableName + " (ID NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
|
||||
|
||||
private final String createJournalTableSQL = "CREATE TABLE " + tableName + " (id NUMBER(19) GENERATED BY DEFAULT ON NULL AS IDENTITY,recordType NUMBER(5),compactCount NUMBER(5),txId NUMBER(19),userRecordType NUMBER(5),variableSize NUMBER(10),record BLOB,txDataSize NUMBER(10),txData BLOB,txCheckNoRecords NUMBER(10),seq NUMBER(19))";
|
||||
|
||||
private static final long MAX_BLOB_SIZE = 4294967296L; //4GB
|
||||
|
||||
protected Oracle12CSQLProvider(String tableName) {
|
||||
super(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxBlobSize() {
|
||||
return MAX_BLOB_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCreateFileTableSQL() {
|
||||
return createFileTableSQL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getCreateJournalTableSQL() {
|
||||
return new String[] {createJournalTableSQL};
|
||||
}
|
||||
|
||||
public static class Factory implements SQLProvider.Factory {
|
||||
|
||||
@Override
|
||||
public SQLProvider create(String tableName) {
|
||||
return new Oracle12CSQLProvider(tableName);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -155,7 +155,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
try (ResultSet rs = readLargeObject.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
Blob blob = rs.getBlob(1);
|
||||
file.setWritePosition((int) blob.length());
|
||||
if (blob != null) {
|
||||
file.setWritePosition((int) blob.length());
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
} catch (SQLException e) {
|
||||
|
@ -250,6 +252,9 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
try (ResultSet rs = appendToLargeObject.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
Blob blob = rs.getBlob(1);
|
||||
if (blob == null) {
|
||||
blob = connection.createBlob();
|
||||
}
|
||||
bytesWritten = blob.setBytes(blob.length() + 1, data);
|
||||
rs.updateBlob(1, blob);
|
||||
rs.updateRow();
|
||||
|
@ -279,9 +284,11 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
|
|||
try (ResultSet rs = readLargeObject.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
final Blob blob = rs.getBlob(1);
|
||||
readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
|
||||
byte[] data = blob.getBytes(file.position() + 1, readLength);
|
||||
bytes.put(data);
|
||||
if (blob != null) {
|
||||
readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
|
||||
byte[] data = blob.getBytes(file.position() + 1, readLength);
|
||||
bytes.put(data);
|
||||
}
|
||||
}
|
||||
connection.commit();
|
||||
return readLength;
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriv
|
|||
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
|
||||
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
/**
|
||||
* Integration point between Paging and JDBC
|
||||
|
@ -80,6 +79,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
|
||||
private JDBCSequentialFile directoryList;
|
||||
|
||||
private boolean started = false;
|
||||
|
||||
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,
|
||||
final StorageManager storageManager,
|
||||
final long syncTimeout,
|
||||
|
@ -93,27 +94,33 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
this.scheduledExecutor = scheduledExecutor;
|
||||
this.syncTimeout = syncTimeout;
|
||||
this.dbConf = dbConf;
|
||||
|
||||
if (dbConf.getDataSource() != null) {
|
||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||
if (sqlProviderFactory == null) {
|
||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||
} else {
|
||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||
}
|
||||
pagingFactoryFileFactory.start();
|
||||
directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
||||
directoryList.open();
|
||||
start();
|
||||
}
|
||||
|
||||
public synchronized void start() throws Exception {
|
||||
if (!started) {
|
||||
if (dbConf.getDataSource() != null) {
|
||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory();
|
||||
if (sqlProviderFactory == null) {
|
||||
sqlProviderFactory = new GenericSQLProvider.Factory();
|
||||
}
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||
} else {
|
||||
String driverClassName = dbConf.getJdbcDriverClassName();
|
||||
pagingFactoryFileFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getPageStoreTableName()), executorFactory.getExecutor());
|
||||
}
|
||||
pagingFactoryFileFactory.start();
|
||||
started = true;
|
||||
}
|
||||
}
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
pagingFactoryFileFactory.stop();
|
||||
public synchronized void stop() {
|
||||
if (started) {
|
||||
pagingFactoryFileFactory.stop();
|
||||
started = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -136,8 +143,8 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
|
||||
@Override
|
||||
public synchronized SequentialFileFactory newFileFactory(final SimpleString address) throws Exception {
|
||||
String guid = UUIDGenerator.getInstance().generateStringUUID();
|
||||
SequentialFileFactory factory = newFileFactory(guid, true);
|
||||
String tableName = "" + storageManager.generateID();
|
||||
SequentialFileFactory factory = newFileFactory(tableName, true);
|
||||
factory.start();
|
||||
|
||||
SequentialFile file = factory.createSequentialFile(PagingStoreFactoryDatabase.ADDRESS_FILE);
|
||||
|
@ -146,6 +153,7 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(SimpleString.sizeofNullableString(address));
|
||||
buffer.writeSimpleString(address);
|
||||
file.write(buffer, true);
|
||||
file.close();
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
@ -157,15 +165,18 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
@Override
|
||||
public synchronized List<PagingStore> reloadStores(final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
|
||||
// We assume the directory list < Integer.MAX_VALUE (this is only a list of addresses).
|
||||
JDBCSequentialFile directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
||||
directoryList.open();
|
||||
|
||||
int size = ((Long) directoryList.size()).intValue();
|
||||
ActiveMQBuffer buffer = readActiveMQBuffer(directoryList, size);
|
||||
|
||||
ArrayList<PagingStore> storesReturn = new ArrayList<>();
|
||||
|
||||
while (buffer.readableBytes() > 0) {
|
||||
SimpleString guid = buffer.readSimpleString();
|
||||
SimpleString table = buffer.readSimpleString();
|
||||
|
||||
JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(guid.toString(), false);
|
||||
JDBCSequentialFileFactory factory = (JDBCSequentialFileFactory) newFileFactory(table.toString(), false);
|
||||
factory.start();
|
||||
|
||||
JDBCSequentialFile addressFile = (JDBCSequentialFile) factory.createSequentialFile(ADDRESS_FILE);
|
||||
|
@ -185,15 +196,28 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
|
|||
|
||||
storesReturn.add(store);
|
||||
}
|
||||
directoryList.close();
|
||||
return storesReturn;
|
||||
}
|
||||
|
||||
private synchronized SequentialFileFactory newFileFactory(final String directoryName, boolean writeToDirectory) throws Exception {
|
||||
JDBCSequentialFile directoryList = (JDBCSequentialFile) pagingFactoryFileFactory.createSequentialFile(DIRECTORY_NAME);
|
||||
directoryList.open();
|
||||
SimpleString simpleString = SimpleString.toSimpleString(directoryName);
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(simpleString.sizeof());
|
||||
buffer.writeSimpleString(simpleString);
|
||||
if (writeToDirectory) directoryList.write(buffer, true);
|
||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName)), executorFactory.getExecutor());
|
||||
directoryList.close();
|
||||
|
||||
SQLProvider sqlProvider = null;
|
||||
if (dbConf.getDataSource() != null) {
|
||||
SQLProvider.Factory sqlProviderFactory = dbConf.getSqlProviderFactory() == null ? new GenericSQLProvider.Factory() : dbConf.getSqlProviderFactory();
|
||||
sqlProvider = sqlProviderFactory.create(getTableNameForGUID(directoryName));
|
||||
} else {
|
||||
sqlProvider = JDBCUtils.getSQLProvider(dbConf.getJdbcDriverClassName(), getTableNameForGUID(directoryName));
|
||||
}
|
||||
|
||||
return new JDBCSequentialFileFactory(pagingFactoryFileFactory.getDbDriver().getConnection(), sqlProvider, executorFactory.getExecutor());
|
||||
}
|
||||
|
||||
private String getTableNameForGUID(String guid) {
|
||||
|
|
Loading…
Reference in New Issue