ARTEMIS-513 Add JDBC Sequential File Factory Impl

This commit is contained in:
Martyn Taylor 2016-05-03 14:36:10 +01:00
parent 2a415a80e9
commit c9b953433e
9 changed files with 1107 additions and 15 deletions

View File

@ -23,6 +23,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
public class JDBCUtils {
public static Driver getDriver(String className) throws Exception {
@ -60,4 +64,13 @@ public class JDBCUtils {
statement.executeUpdate(sql);
}
}
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
if (driverClass.contains("derby")) {
return new DerbySQLProvider(tableName);
}
else {
return new GenericSQLProvider(tableName);
}
}
}

View File

@ -0,0 +1,419 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public class JDBCSequentialFile implements SequentialFile {
private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
private final String filename;
private final String extension;
private boolean isOpen = false;
private boolean isCreated = false;
private int id = -1;
private final PreparedStatement appendToFile;
private final PreparedStatement deleteFile;
private final PreparedStatement readFile;
private final PreparedStatement createFile;
private final PreparedStatement selectFileByFileName;
private final PreparedStatement copyFileRecord;
private final PreparedStatement renameFile;
private long readPosition = 0;
private long writePosition = 0;
private Executor executor;
private JDBCSequentialFileFactory fileFactory;
private int maxSize;
private SQLProvider sqlProvider;
private final Object writeLock;
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
final SQLProvider sqlProvider,
final Executor executor,
final Object writeLock) throws SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
this.executor = executor;
this.maxSize = sqlProvider.getMaxBlobSize();
this.sqlProvider = sqlProvider;
this.writeLock = writeLock;
Connection connection = fileFactory.getConnection();
this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL());
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL());
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
}
@Override
public boolean isOpen() {
return isOpen;
}
@Override
public boolean exists() {
return isCreated;
}
@Override
public synchronized void open() throws Exception {
if (!isOpen) {
try {
synchronized (writeLock) {
selectFileByFileName.setString(1, filename);
try (ResultSet rs = selectFileByFileName.executeQuery()) {
if (!rs.next()) {
createFile.setString(1, filename);
createFile.setString(2, extension);
createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
this.id = keys.getInt(1);
}
}
else {
this.id = rs.getInt(1);
this.writePosition = rs.getBlob(4).length();
}
}
}
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e);
isOpen = false;
}
isCreated = true;
isOpen = true;
}
}
@Override
public void open(int maxIO, boolean useExecutor) throws Exception {
open();
}
@Override
public boolean fits(int size) {
return writePosition + size <= maxSize;
}
@Override
public int getAlignment() throws Exception {
return 0;
}
@Override
public int calculateBlockStart(int position) throws Exception {
return 0;
}
@Override
public String getFileName() {
return filename;
}
@Override
public void fill(int size) throws Exception {
// Do nothing
}
@Override
public void delete() throws IOException, InterruptedException, ActiveMQException {
try {
if (isCreated) {
deleteFile.setInt(1, id);
deleteFile.executeUpdate();
}
}
catch (SQLException e) {
throw new IOException(e);
}
}
private synchronized int internalWrite(byte[] data, IOCallback callback) {
try {
synchronized (writeLock) {
int noBytes = data.length;
appendToFile.setBytes(1, data);
appendToFile.setInt(2, id);
int result = appendToFile.executeUpdate();
if (result < 1)
throw new ActiveMQException("No record found for file id: " + id);
seek(noBytes);
if (callback != null)
callback.done();
return noBytes;
}
}
catch (Exception e) {
e.printStackTrace();
if (callback != null)
callback.onError(-1, e.getMessage());
}
return -1;
}
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return internalWrite(data, callback);
}
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback);
}
public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
internalWrite(bytes, callback);
}
});
}
public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
internalWrite(bytes, callback);
}
});
}
synchronized void seek(long noBytes) {
writePosition += noBytes;
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
// We ignore sync since we schedule writes straight away.
scheduleWrite(bytes, callback);
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
write(bytes, sync, null);
}
@Override
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize());
bytes.encode(data);
scheduleWrite(data, callback);
}
@Override
public void write(EncodingSupport bytes, boolean sync) throws Exception {
write(bytes, sync, null);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
if (callback == null) {
SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback();
try {
scheduleWrite(bytes, waitIOCallback);
waitIOCallback.waitCompletion();
}
catch (Exception e) {
waitIOCallback.onError(-1, e.getMessage());
}
}
else {
scheduleWrite(bytes, callback);
}
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
writeDirect(bytes, sync, null);
// Are we meant to block here?
}
@Override
public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException {
synchronized (writeLock) {
readFile.setInt(1, id);
try (ResultSet rs = readFile.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
long bytesRemaining = blob.length() - readPosition;
byte[] data;
if (bytesRemaining > bytes.remaining()) {
// First index into blob is 1 (not 0)
data = blob.getBytes(readPosition + 1, bytes.remaining());
}
else {
// First index into blob is 1 (not 0)
data = blob.getBytes(readPosition + 1, (int) bytesRemaining);
}
bytes.put(data);
readPosition += data.length;
if (callback != null)
callback.done();
return data.length;
}
return 0;
}
catch (Exception e) {
if (callback != null)
callback.onError(-1, e.getMessage());
return 0;
}
}
}
@Override
public int read(ByteBuffer bytes) throws Exception {
return read(bytes, null);
}
@Override
public void position(long pos) throws IOException {
readPosition = pos;
}
@Override
public long position() {
return readPosition;
}
@Override
public synchronized void close() throws Exception {
isOpen = false;
}
@Override
public void sync() throws IOException {
// (mtaylor) We always write straight away, so we don't need to do anything here.
// (mtaylor) Is this meant to be blocking?
}
@Override
public long size() throws Exception {
return writePosition;
}
@Override
public void renameTo(String newFileName) throws Exception {
renameFile.setString(1, newFileName);
renameFile.setInt(2, id);
renameFile.executeUpdate();
}
@Override
public SequentialFile cloneFile() {
try {
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock);
return clone;
}
catch (Exception e) {
logger.error("Error cloning file: " + filename, e);
return null;
}
}
@Override
public void copyTo(SequentialFile cloneFile) throws Exception {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
clone.open();
copyFileRecord.setInt(1, id);
copyFileRecord.setInt(2, clone.getId());
copyFileRecord.executeUpdate();
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFilename() {
return filename;
}
public String getExtension() {
return extension;
}
// Only Used by Journal, no need to implement.
@Override
public void setTimedBuffer(TimedBuffer buffer) {
}
// Only Used by replication, no need to implement.
@Override
public File getJavaFile() {
return null;
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
private Connection connection;
private String connectionUrl;
private final Driver driver;
private boolean started;
private final String tableName;
private List<JDBCSequentialFile> files;
private PreparedStatement selectFileNamesByExtension;
private Executor executor;
private SQLProvider sqlProvider;
private Map<String, Object> fileLocks = new HashMap<>();
public JDBCSequentialFileFactory(final String connectionUrl,
final String tableName,
final String className,
Executor executor) throws Exception {
this.connectionUrl = connectionUrl;
this.executor = executor;
this.tableName = tableName.toUpperCase();
files = new ArrayList<>();
sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName);
driver = JDBCUtils.getDriver(className);
}
public Connection getConnection() {
return connection;
}
@Override
public SequentialFile createSequentialFile(String fileName) {
try {
fileLocks.putIfAbsent(fileName, new Object());
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName));
files.add(file);
return file;
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error("Could not create file", e);
}
return null;
}
@Override
public int getMaxIO() {
return 1;
}
@Override
public List<String> listFiles(String extension) throws Exception {
List<String> fileNames = new ArrayList<>();
selectFileNamesByExtension.setString(1, extension);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
while (rs.next()) {
fileNames.add(rs.getString(1));
}
}
return fileNames;
}
@Override
public boolean isSupportsCallbacks() {
return true;
}
@Override
public void onIOError(Exception exception, String message, SequentialFile file) {
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
}
@Override
public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC
}
@Override
public ByteBuffer newBuffer(final int size) {
return ByteBuffer.allocate(size);
}
@Override
public void clearBuffer(final ByteBuffer buffer) {
final int limit = buffer.limit();
buffer.rewind();
for (int i = 0; i < limit; i++) {
buffer.put((byte) 0);
}
buffer.rewind();
}
@Override
public ByteBuffer wrapBuffer(final byte[] bytes) {
return ByteBuffer.wrap(bytes);
}
@Override
public int getAlignment() {
return 1;
}
@Override
public int calculateBlockSize(final int bytes) {
return bytes;
}
@Override
public void deactivateBuffer() {
}
@Override
public void releaseBuffer(final ByteBuffer buffer) {
}
@Override
public void activateBuffer(SequentialFile file) {
}
@Override
public File getDirectory() {
return null;
}
@Override
public synchronized void start() {
try {
if (!started) {
connection = driver.connect(connectionUrl, new Properties());
JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL());
selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
started = true;
}
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
started = false;
}
}
@Override
public synchronized void stop() {
try {
if (false)
connection.close();
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
}
started = false;
}
@Override
public boolean isStarted() {
return started;
}
@Override
public void createDirs() throws Exception {
}
@Override
public void flush() {
}
public synchronized void destroy() throws SQLException {
Statement statement = connection.createStatement();
statement.executeUpdate(sqlProvider.getDropFileTableSQL());
stop();
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file.sql;
public class DerbySQLProvider extends GenericSQLProvider {
// Derby max blob size = 2G
private static final int MAX_BLOB_SIZE = 2147483647;
private final String createFileTableSQL;
private final String appendToFileSQL;
public DerbySQLProvider(String tableName) {
super(tableName);
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
"FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
}
@Override
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
@Override
public String getCreateFileTableSQL() {
return createFileTableSQL;
}
@Override
public String getAppendToFileSQL() {
return appendToFileSQL;
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file.sql;
public class GenericSQLProvider implements SQLProvider {
// Default to lowest (MYSQL = 64k)
private static final int MAX_BLOB_SIZE = 64512;
private final String tableName;
private final String createFileTableSQL;
private final String insertFileSQL;
private final String selectFileNamesByExtensionSQL;
private final String selectIdByFileNameSQL;
private final String appendToFileSQL;
private final String readFileSQL;
private final String deleteFileSQL;
private final String updateFileNameByIdSQL;
private final String copyFileRecordByIdSQL;
private final String cloneFileRecordSQL;
private final String dropFileTableSQL;
public GenericSQLProvider(String tableName) {
this.tableName = tableName;
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
insertFileSQL = "INSERT INTO " + tableName +
" (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
"(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
dropFileTableSQL = "DROP TABLE " + tableName;
}
@Override
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
@Override
public String getTableName() {
return tableName;
}
@Override
public String getCreateFileTableSQL() {
return createFileTableSQL;
}
@Override
public String getInsertFileSQL() {
return insertFileSQL;
}
@Override
public String getSelectFileByFileName() {
return selectIdByFileNameSQL;
}
@Override
public String getSelectFileNamesByExtensionSQL() {
return selectFileNamesByExtensionSQL;
}
@Override
public String getAppendToFileSQL() {
return appendToFileSQL;
}
@Override
public String getReadFileSQL() {
return readFileSQL;
}
@Override
public String getDeleteFileSQL() {
return deleteFileSQL;
}
@Override
public String getUpdateFileNameByIdSQL() {
return updateFileNameByIdSQL;
}
@Override
public String getCopyFileRecordByIdSQL() {
return copyFileRecordByIdSQL;
}
@Override
public String getCloneFileRecordByIdSQL() {
return cloneFileRecordSQL;
}
@Override
public String getDropFileTableSQL() {
return dropFileTableSQL;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file.sql;
public interface SQLProvider {
int getMaxBlobSize();
String getTableName();
String getCreateFileTableSQL();
String getInsertFileSQL();
String getSelectFileNamesByExtensionSQL();
String getSelectFileByFileName();
String getAppendToFileSQL();
String getReadFileSQL();
String getDeleteFileSQL();
String getUpdateFileNameByIdSQL();
String getCopyFileRecordByIdSQL();
String getDropFileTableSQL();
String getCloneFileRecordByIdSQL();
}

View File

@ -229,12 +229,13 @@ public class JDBCJournalImpl implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */
private void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values());
for (Long txId : committedTx) {
transactions.get(txId).committed = true;
}
@ -319,7 +320,7 @@ public class JDBCJournalImpl implements Journal {
if (callback != null) callback.waitCompletion();
}
private void addTxRecord(JDBCJournalRecord record) {
private synchronized void addTxRecord(JDBCJournalRecord record) {
TransactionHolder txHolder = transactions.get(record.getTxId());
if (txHolder == null) {
txHolder = new TransactionHolder(record.getTxId());
@ -341,7 +342,7 @@ public class JDBCJournalImpl implements Journal {
}
}
private void removeTxRecord(JDBCJournalRecord record) {
private synchronized void removeTxRecord(JDBCJournalRecord record) {
TransactionHolder txHolder = transactions.get(record.getTxId());
// We actually only need the record ID in this instance.

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.file;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class JDBCSequentialFileFactoryTest {
private static String connectionUrl = "jdbc:derby:target/data;create=true";
private static String tableName = "FILES";
private static String className = EmbeddedDriver.class.getCanonicalName();
private JDBCSequentialFileFactory factory;
@Before
public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor();
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
factory.start();
}
@After
public void tearDown() throws SQLException {
factory.destroy();
}
@Test
public void testJDBCFileFactoryStarted() throws Exception {
assertTrue(factory.isStarted());
}
@Test
public void testCreateFiles() throws Exception {
int noFiles = 100;
Set<String> fileNames = new HashSet<String>();
for (int i = 0; i < noFiles; i++) {
String fileName = UUID.randomUUID().toString() + ".txt";
fileNames.add(fileName);
SequentialFile file = factory.createSequentialFile(fileName);
// We create files on Open
file.open();
}
List<String> queryFileNames = factory.listFiles("txt");
assertTrue(queryFileNames.containsAll(fileNames));
}
@Test
public void testAsyncAppendToFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
// Create buffer and fill with test data
int bufferSize = 1024;
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte) 1);
}
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
callback.assertEmpty(5);
checkData(file, src);
}
@Test
public void testCopyFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
// Create buffer and fill with test data
int bufferSize = 1024;
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte) 5);
}
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt");
file.copyTo(copy);
checkData(copy, src);
checkData(file, src);
}
@Test
public void testCloneFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
// Create buffer and fill with test data
int bufferSize = 1024;
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte) 5);
}
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile();
}
private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException {
expectedData.resetReaderIndex();
byte[] resultingBytes = new byte[expectedData.readableBytes()];
ByteBuffer byteBuffer = ByteBuffer.allocate(expectedData.readableBytes());
file.read(byteBuffer, null);
expectedData.getBytes(0, resultingBytes);
assertArrayEquals(resultingBytes, byteBuffer.array());
}
private class IOCallbackCountdown implements IOCallback {
private final CountDownLatch countDownLatch;
public IOCallbackCountdown(int size) {
this.countDownLatch = new CountDownLatch(size);
}
@Override
public void done() {
countDownLatch.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
fail(errorMessage);
}
public void assertEmpty(int timeout) throws InterruptedException {
countDownLatch.await(timeout, TimeUnit.SECONDS);
assertEquals(countDownLatch.getCount(), 0);
}
}
}

View File

@ -65,18 +65,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
}
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
@Override
public boolean isSupportsCallbacks() {
return timedBuffer != null;
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
public static ByteBuffer allocateDirectByteBuffer(final int size) {
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
ByteBuffer buffer2 = null;
try {
@ -104,6 +93,21 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return buffer2;
}
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
@Override
public boolean isSupportsCallbacks() {
return timedBuffer != null;
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
}
@Override
public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC