This commit is contained in:
Justin Bertram 2017-06-26 13:08:04 -05:00
commit 583abcefba
3 changed files with 20 additions and 18 deletions

View File

@ -23,6 +23,7 @@ import java.sql.SQLException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -43,9 +44,9 @@ public class JDBCSequentialFile implements SequentialFile {
private final String extension; private final String extension;
private boolean isOpen = false; private AtomicBoolean isOpen = new AtomicBoolean(false);
private boolean isLoaded = false; private AtomicBoolean isLoaded = new AtomicBoolean(false);
private long id = -1; private long id = -1;
@ -83,12 +84,12 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public boolean isOpen() { public boolean isOpen() {
return isOpen; return isOpen.get();
} }
@Override @Override
public boolean exists() { public boolean exists() {
if (isLoaded) return true; if (isLoaded.get()) return true;
try { try {
return fileFactory.listFiles(extension).contains(filename); return fileFactory.listFiles(extension).contains(filename);
} catch (Exception e) { } catch (Exception e) {
@ -100,21 +101,20 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public void open() throws Exception { public void open() throws Exception {
load(); isOpen.compareAndSet(false, load());
isOpen = true;
} }
private void load() { private boolean load() {
try { try {
synchronized (writeLock) { if (isLoaded.compareAndSet(false, true)) {
if (!isLoaded) { dbDriver.openFile(this);
dbDriver.openFile(this);
isLoaded = true;
}
} }
return true;
} catch (SQLException e) { } catch (SQLException e) {
isLoaded.set(false);
fileFactory.onIOError(e, "Error attempting to open JDBC file.", this); fileFactory.onIOError(e, "Error attempting to open JDBC file.", this);
} }
return false;
} }
@Override @Override
@ -146,8 +146,9 @@ public class JDBCSequentialFile implements SequentialFile {
public void delete() throws IOException, InterruptedException, ActiveMQException { public void delete() throws IOException, InterruptedException, ActiveMQException {
try { try {
synchronized (writeLock) { synchronized (writeLock) {
load(); if (load()) {
dbDriver.deleteFile(this); dbDriver.deleteFile(this);
}
} }
} catch (SQLException e) { } catch (SQLException e) {
fileFactory.onIOError(e, "Error deleting JDBC file.", this); fileFactory.onIOError(e, "Error deleting JDBC file.", this);
@ -156,6 +157,7 @@ public class JDBCSequentialFile implements SequentialFile {
private synchronized int internalWrite(byte[] data, IOCallback callback) { private synchronized int internalWrite(byte[] data, IOCallback callback) {
try { try {
open();
synchronized (writeLock) { synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data); int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes); seek(noBytes);
@ -281,7 +283,7 @@ public class JDBCSequentialFile implements SequentialFile {
@Override @Override
public void close() throws Exception { public void close() throws Exception {
isOpen = false; isOpen.set(false);
sync(); sync();
fileFactory.sequentialFileClosed(this); fileFactory.sequentialFileClosed(this);
} }

View File

@ -165,7 +165,7 @@ public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
if (blob != null) { if (blob != null) {
file.setWritePosition(blob.length()); file.setWritePosition(blob.length());
} else { } else {
logger.warn("ERROR NO BLOB FOR FILE" + "File: " + file.getFileName() + " " + file.getId()); logger.trace("No Blob found for file: " + file.getFileName() + " " + file.getId());
} }
} }
connection.commit(); connection.commit();

View File

@ -66,12 +66,12 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
} }
bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
} else { } else {
String driverClassName = dbConf.getJdbcDriverClassName(); String driverClassName = dbConf.getJdbcDriverClassName();
bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName(), SQLProvider.DatabaseStoreType.BINDINGS_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener); messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName(), SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorFactory.getExecutor(), criticalErrorListener);
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executor, criticalErrorListener); largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName(), SQLProvider.DatabaseStoreType.LARGE_MESSAGE), executorFactory.getExecutor(), criticalErrorListener);
} }
final int networkTimeout = dbConf.getJdbcNetworkTimeout(); final int networkTimeout = dbConf.getJdbcNetworkTimeout();
if (networkTimeout >= 0) { if (networkTimeout >= 0) {