This closes #502

This commit is contained in:
Clebert Suconic 2016-05-04 13:36:44 -04:00
commit e458a4327c
30 changed files with 1369 additions and 142 deletions

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
* Default values of ActiveMQ Artemis configuration parameters.
*/
public final class ActiveMQDefaultConfiguration {
/*
/*
* <p> In order to avoid compile time in-lining of constants, all access is done through methods
* and all fields are PRIVATE STATIC but not FINAL. This is done following the recommendation at
* <a href="http://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html#jls-13.4.9">13.4.9.
@ -414,6 +414,9 @@ public final class ActiveMQDefaultConfiguration {
// Default bindings table name, used with Database storage type
private static String DEFAULT_BINDINGS_TABLE_NAME = "BINDINGS";
// Default large messages table name, used with Database storage type
private static final String DEFAULT_LARGE_MESSAGES_TABLE_NAME = "LARGE_MESSAGES";
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1103,4 +1106,8 @@ public final class ActiveMQDefaultConfiguration {
public static String getDefaultDriverClassName() {
return DEFAULT_JDBC_DRIVER_CLASS_NAME;
}
public static String getDefaultLargeMessagesTableName() {
return DEFAULT_LARGE_MESSAGES_TABLE_NAME;
}
}

View File

@ -23,6 +23,10 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.file.sql.DerbySQLProvider;
import org.apache.activemq.artemis.jdbc.store.file.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
public class JDBCUtils {
public static Driver getDriver(String className) throws Exception {
@ -60,4 +64,13 @@ public class JDBCUtils {
statement.executeUpdate(sql);
}
}
public static SQLProvider getSQLProvider(String driverClass, String tableName) {
if (driverClass.contains("derby")) {
return new DerbySQLProvider(tableName);
}
else {
return new GenericSQLProvider(tableName);
}
}
}

View File

@ -0,0 +1,419 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public class JDBCSequentialFile implements SequentialFile {
private static final Logger logger = Logger.getLogger(JDBCSequentialFile.class);
private final String filename;
private final String extension;
private boolean isOpen = false;
private boolean isCreated = false;
private int id = -1;
private final PreparedStatement appendToFile;
private final PreparedStatement deleteFile;
private final PreparedStatement readFile;
private final PreparedStatement createFile;
private final PreparedStatement selectFileByFileName;
private final PreparedStatement copyFileRecord;
private final PreparedStatement renameFile;
private long readPosition = 0;
private long writePosition = 0;
private Executor executor;
private JDBCSequentialFileFactory fileFactory;
private int maxSize;
private SQLProvider sqlProvider;
private final Object writeLock;
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
final SQLProvider sqlProvider,
final Executor executor,
final Object writeLock) throws SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
this.extension = filename.contains(".") ? filename.substring(filename.lastIndexOf(".") + 1, filename.length()) : "";
this.executor = executor;
this.maxSize = sqlProvider.getMaxBlobSize();
this.sqlProvider = sqlProvider;
this.writeLock = writeLock;
Connection connection = fileFactory.getConnection();
this.appendToFile = connection.prepareStatement(sqlProvider.getAppendToFileSQL());
this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
this.readFile = connection.prepareStatement(sqlProvider.getReadFileSQL());
this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
}
@Override
public boolean isOpen() {
return isOpen;
}
@Override
public boolean exists() {
return isCreated;
}
@Override
public synchronized void open() throws Exception {
if (!isOpen) {
try {
synchronized (writeLock) {
selectFileByFileName.setString(1, filename);
try (ResultSet rs = selectFileByFileName.executeQuery()) {
if (!rs.next()) {
createFile.setString(1, filename);
createFile.setString(2, extension);
createFile.setBytes(3, new byte[0]);
createFile.executeUpdate();
try (ResultSet keys = createFile.getGeneratedKeys()) {
keys.next();
this.id = keys.getInt(1);
}
}
else {
this.id = rs.getInt(1);
this.writePosition = rs.getBlob(4).length();
}
}
}
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error retreiving file record", e);
isOpen = false;
}
isCreated = true;
isOpen = true;
}
}
@Override
public void open(int maxIO, boolean useExecutor) throws Exception {
open();
}
@Override
public boolean fits(int size) {
return writePosition + size <= maxSize;
}
@Override
public int getAlignment() throws Exception {
return 0;
}
@Override
public int calculateBlockStart(int position) throws Exception {
return 0;
}
@Override
public String getFileName() {
return filename;
}
@Override
public void fill(int size) throws Exception {
// Do nothing
}
@Override
public void delete() throws IOException, InterruptedException, ActiveMQException {
try {
if (isCreated) {
deleteFile.setInt(1, id);
deleteFile.executeUpdate();
}
}
catch (SQLException e) {
throw new IOException(e);
}
}
private synchronized int internalWrite(byte[] data, IOCallback callback) {
try {
synchronized (writeLock) {
int noBytes = data.length;
appendToFile.setBytes(1, data);
appendToFile.setInt(2, id);
int result = appendToFile.executeUpdate();
if (result < 1)
throw new ActiveMQException("No record found for file id: " + id);
seek(noBytes);
if (callback != null)
callback.done();
return noBytes;
}
}
catch (Exception e) {
e.printStackTrace();
if (callback != null)
callback.onError(-1, e.getMessage());
}
return -1;
}
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return internalWrite(data, callback);
}
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback);
}
public void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
internalWrite(bytes, callback);
}
});
}
public void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
internalWrite(bytes, callback);
}
});
}
synchronized void seek(long noBytes) {
writePosition += noBytes;
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
// We ignore sync since we schedule writes straight away.
scheduleWrite(bytes, callback);
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
write(bytes, sync, null);
}
@Override
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
ActiveMQBuffer data = ActiveMQBuffers.fixedBuffer(bytes.getEncodeSize());
bytes.encode(data);
scheduleWrite(data, callback);
}
@Override
public void write(EncodingSupport bytes, boolean sync) throws Exception {
write(bytes, sync, null);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
if (callback == null) {
SimpleWaitIOCallback waitIOCallback = new SimpleWaitIOCallback();
try {
scheduleWrite(bytes, waitIOCallback);
waitIOCallback.waitCompletion();
}
catch (Exception e) {
waitIOCallback.onError(-1, e.getMessage());
}
}
else {
scheduleWrite(bytes, callback);
}
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
writeDirect(bytes, sync, null);
// Are we meant to block here?
}
@Override
public synchronized int read(ByteBuffer bytes, IOCallback callback) throws SQLException {
synchronized (writeLock) {
readFile.setInt(1, id);
try (ResultSet rs = readFile.executeQuery()) {
if (rs.next()) {
Blob blob = rs.getBlob(1);
long bytesRemaining = blob.length() - readPosition;
byte[] data;
if (bytesRemaining > bytes.remaining()) {
// First index into blob is 1 (not 0)
data = blob.getBytes(readPosition + 1, bytes.remaining());
}
else {
// First index into blob is 1 (not 0)
data = blob.getBytes(readPosition + 1, (int) bytesRemaining);
}
bytes.put(data);
readPosition += data.length;
if (callback != null)
callback.done();
return data.length;
}
return 0;
}
catch (Exception e) {
if (callback != null)
callback.onError(-1, e.getMessage());
return 0;
}
}
}
@Override
public int read(ByteBuffer bytes) throws Exception {
return read(bytes, null);
}
@Override
public void position(long pos) throws IOException {
readPosition = pos;
}
@Override
public long position() {
return readPosition;
}
@Override
public synchronized void close() throws Exception {
isOpen = false;
}
@Override
public void sync() throws IOException {
// (mtaylor) We always write straight away, so we don't need to do anything here.
// (mtaylor) Is this meant to be blocking?
}
@Override
public long size() throws Exception {
return writePosition;
}
@Override
public void renameTo(String newFileName) throws Exception {
renameFile.setString(1, newFileName);
renameFile.setInt(2, id);
renameFile.executeUpdate();
}
@Override
public SequentialFile cloneFile() {
try {
JDBCSequentialFile clone = new JDBCSequentialFile(fileFactory, filename, sqlProvider, executor, writeLock);
return clone;
}
catch (Exception e) {
logger.error("Error cloning file: " + filename, e);
return null;
}
}
@Override
public void copyTo(SequentialFile cloneFile) throws Exception {
JDBCSequentialFile clone = (JDBCSequentialFile) cloneFile;
clone.open();
copyFileRecord.setInt(1, id);
copyFileRecord.setInt(2, clone.getId());
copyFileRecord.executeUpdate();
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFilename() {
return filename;
}
public String getExtension() {
return extension;
}
// Only Used by Journal, no need to implement.
@Override
public void setTimedBuffer(TimedBuffer buffer) {
}
// Only Used by replication, no need to implement.
@Override
public File getJavaFile() {
return null;
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file;
import java.io.File;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.file.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
private Connection connection;
private String connectionUrl;
private final Driver driver;
private boolean started;
private final String tableName;
private List<JDBCSequentialFile> files;
private PreparedStatement selectFileNamesByExtension;
private Executor executor;
private SQLProvider sqlProvider;
private Map<String, Object> fileLocks = new HashMap<>();
public JDBCSequentialFileFactory(final String connectionUrl,
final String tableName,
final String className,
Executor executor) throws Exception {
this.connectionUrl = connectionUrl;
this.executor = executor;
this.tableName = tableName.toUpperCase();
files = new ArrayList<>();
sqlProvider = JDBCUtils.getSQLProvider(JDBCUtils.getDriver(className).getClass().getCanonicalName(), tableName);
driver = JDBCUtils.getDriver(className);
}
public Connection getConnection() {
return connection;
}
@Override
public SequentialFile createSequentialFile(String fileName) {
try {
fileLocks.putIfAbsent(fileName, new Object());
JDBCSequentialFile file = new JDBCSequentialFile(this, fileName, sqlProvider, executor, fileLocks.get(fileName));
files.add(file);
return file;
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error("Could not create file", e);
}
return null;
}
@Override
public int getMaxIO() {
return 1;
}
@Override
public List<String> listFiles(String extension) throws Exception {
List<String> fileNames = new ArrayList<>();
selectFileNamesByExtension.setString(1, extension);
try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
while (rs.next()) {
fileNames.add(rs.getString(1));
}
}
return fileNames;
}
@Override
public boolean isSupportsCallbacks() {
return true;
}
@Override
public void onIOError(Exception exception, String message, SequentialFile file) {
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
}
@Override
public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC
}
@Override
public ByteBuffer newBuffer(final int size) {
return ByteBuffer.allocate(size);
}
@Override
public void clearBuffer(final ByteBuffer buffer) {
final int limit = buffer.limit();
buffer.rewind();
for (int i = 0; i < limit; i++) {
buffer.put((byte) 0);
}
buffer.rewind();
}
@Override
public ByteBuffer wrapBuffer(final byte[] bytes) {
return ByteBuffer.wrap(bytes);
}
@Override
public int getAlignment() {
return 1;
}
@Override
public int calculateBlockSize(final int bytes) {
return bytes;
}
@Override
public void deactivateBuffer() {
}
@Override
public void releaseBuffer(final ByteBuffer buffer) {
}
@Override
public void activateBuffer(SequentialFile file) {
}
@Override
public File getDirectory() {
return null;
}
@Override
public synchronized void start() {
try {
if (!started) {
connection = driver.connect(connectionUrl, new Properties());
JDBCUtils.createTableIfNotExists(connection, tableName, sqlProvider.getCreateFileTableSQL());
selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
started = true;
}
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Could not start file factory, unable to connect to database");
started = false;
}
}
@Override
public synchronized void stop() {
try {
if (false)
connection.close();
}
catch (SQLException e) {
ActiveMQJournalLogger.LOGGER.error("Error stopping file factory, unable to close db connection");
}
started = false;
}
@Override
public boolean isStarted() {
return started;
}
@Override
public void createDirs() throws Exception {
}
@Override
public void flush() {
}
public synchronized void destroy() throws SQLException {
Statement statement = connection.createStatement();
statement.executeUpdate(sqlProvider.getDropFileTableSQL());
stop();
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file.sql;
public class DerbySQLProvider extends GenericSQLProvider {
// Derby max blob size = 2G
private static final int MAX_BLOB_SIZE = 2147483647;
private final String createFileTableSQL;
private final String appendToFileSQL;
public DerbySQLProvider(String tableName) {
super(tableName);
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
"FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
appendToFileSQL = "UPDATE " + tableName + " SET DATA = DATA || ? WHERE ID=?";
}
@Override
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
@Override
public String getCreateFileTableSQL() {
return createFileTableSQL;
}
@Override
public String getAppendToFileSQL() {
return appendToFileSQL;
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file.sql;
public class GenericSQLProvider implements SQLProvider {
// Default to lowest (MYSQL = 64k)
private static final int MAX_BLOB_SIZE = 64512;
private final String tableName;
private final String createFileTableSQL;
private final String insertFileSQL;
private final String selectFileNamesByExtensionSQL;
private final String selectIdByFileNameSQL;
private final String appendToFileSQL;
private final String readFileSQL;
private final String deleteFileSQL;
private final String updateFileNameByIdSQL;
private final String copyFileRecordByIdSQL;
private final String cloneFileRecordSQL;
private final String dropFileTableSQL;
public GenericSQLProvider(String tableName) {
this.tableName = tableName;
createFileTableSQL = "CREATE TABLE " + tableName +
"(ID INT AUTO_INCREMENT, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA BLOB, PRIMARY KEY(ID))";
insertFileSQL = "INSERT INTO " + tableName +
" (FILENAME, EXTENSION, DATA) VALUES (?,?,?)";
selectFileNamesByExtensionSQL = "SELECT FILENAME, ID FROM " + tableName + " WHERE EXTENSION=?";
selectIdByFileNameSQL = "SELECT ID, FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE fileName=?";
appendToFileSQL = "UPDATE " + tableName + " SET DATA = CONCAT(DATA, ?) WHERE ID=?";
readFileSQL = "SELECT DATA FROM " + tableName + " WHERE ID=?";
deleteFileSQL = "DELETE FROM " + tableName + " WHERE ID=?";
updateFileNameByIdSQL = "UPDATE " + tableName + " SET FILENAME=? WHERE ID=?";
cloneFileRecordSQL = "INSERT INTO " + tableName + "(FILENAME, EXTENSION, DATA) " +
"(SELECT FILENAME, EXTENSION, DATA FROM " + tableName + " WHERE ID=?)";
copyFileRecordByIdSQL = "UPDATE " + tableName + " SET DATA = (SELECT DATA FROM " + tableName + " WHERE ID=?) WHERE ID=?";
dropFileTableSQL = "DROP TABLE " + tableName;
}
@Override
public int getMaxBlobSize() {
return MAX_BLOB_SIZE;
}
@Override
public String getTableName() {
return tableName;
}
@Override
public String getCreateFileTableSQL() {
return createFileTableSQL;
}
@Override
public String getInsertFileSQL() {
return insertFileSQL;
}
@Override
public String getSelectFileByFileName() {
return selectIdByFileNameSQL;
}
@Override
public String getSelectFileNamesByExtensionSQL() {
return selectFileNamesByExtensionSQL;
}
@Override
public String getAppendToFileSQL() {
return appendToFileSQL;
}
@Override
public String getReadFileSQL() {
return readFileSQL;
}
@Override
public String getDeleteFileSQL() {
return deleteFileSQL;
}
@Override
public String getUpdateFileNameByIdSQL() {
return updateFileNameByIdSQL;
}
@Override
public String getCopyFileRecordByIdSQL() {
return copyFileRecordByIdSQL;
}
@Override
public String getCloneFileRecordByIdSQL() {
return cloneFileRecordSQL;
}
@Override
public String getDropFileTableSQL() {
return dropFileTableSQL;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.file.sql;
public interface SQLProvider {
int getMaxBlobSize();
String getTableName();
String getCreateFileTableSQL();
String getInsertFileSQL();
String getSelectFileNamesByExtensionSQL();
String getSelectFileByFileName();
String getAppendToFileSQL();
String getReadFileSQL();
String getDeleteFileSQL();
String getUpdateFileNameByIdSQL();
String getCopyFileRecordByIdSQL();
String getDropFileTableSQL();
String getCloneFileRecordByIdSQL();
}

View File

@ -229,12 +229,13 @@ public class JDBCJournalImpl implements Journal {
/* We store Transaction reference in memory (once all records associated with a Tranascation are Deleted,
we remove the Tx Records (i.e. PREPARE, COMMIT). */
private void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
private synchronized void cleanupTxRecords(List<Long> deletedRecords, List<Long> committedTx) throws SQLException {
List<RecordInfo> iterableCopy;
List<TransactionHolder> iterableCopyTx = new ArrayList<>();
iterableCopyTx.addAll(transactions.values());
for (Long txId : committedTx) {
transactions.get(txId).committed = true;
}
@ -319,7 +320,7 @@ public class JDBCJournalImpl implements Journal {
if (callback != null) callback.waitCompletion();
}
private void addTxRecord(JDBCJournalRecord record) {
private synchronized void addTxRecord(JDBCJournalRecord record) {
TransactionHolder txHolder = transactions.get(record.getTxId());
if (txHolder == null) {
txHolder = new TransactionHolder(record.getTxId());
@ -341,7 +342,7 @@ public class JDBCJournalImpl implements Journal {
}
}
private void removeTxRecord(JDBCJournalRecord record) {
private synchronized void removeTxRecord(JDBCJournalRecord record) {
TransactionHolder txHolder = transactions.get(record.getTxId());
// We actually only need the record ID in this instance.

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.file;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class JDBCSequentialFileFactoryTest {
private static String connectionUrl = "jdbc:derby:target/data;create=true";
private static String tableName = "FILES";
private static String className = EmbeddedDriver.class.getCanonicalName();
private JDBCSequentialFileFactory factory;
@Before
public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor();
factory = new JDBCSequentialFileFactory(connectionUrl, tableName, className, executor);
factory.start();
}
@After
public void tearDown() throws SQLException {
factory.destroy();
}
@Test
public void testJDBCFileFactoryStarted() throws Exception {
assertTrue(factory.isStarted());
}
@Test
public void testCreateFiles() throws Exception {
int noFiles = 100;
Set<String> fileNames = new HashSet<String>();
for (int i = 0; i < noFiles; i++) {
String fileName = UUID.randomUUID().toString() + ".txt";
fileNames.add(fileName);
SequentialFile file = factory.createSequentialFile(fileName);
// We create files on Open
file.open();
}
List<String> queryFileNames = factory.listFiles("txt");
assertTrue(queryFileNames.containsAll(fileNames));
}
@Test
public void testAsyncAppendToFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
// Create buffer and fill with test data
int bufferSize = 1024;
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte) 1);
}
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
callback.assertEmpty(5);
checkData(file, src);
}
@Test
public void testCopyFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
// Create buffer and fill with test data
int bufferSize = 1024;
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte) 5);
}
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
JDBCSequentialFile copy = (JDBCSequentialFile) factory.createSequentialFile("copy.txt");
file.copyTo(copy);
checkData(copy, src);
checkData(file, src);
}
@Test
public void testCloneFile() throws Exception {
JDBCSequentialFile file = (JDBCSequentialFile) factory.createSequentialFile("test.txt");
file.open();
// Create buffer and fill with test data
int bufferSize = 1024;
ActiveMQBuffer src = ActiveMQBuffers.fixedBuffer(bufferSize);
for (int i = 0; i < bufferSize; i++) {
src.writeByte((byte) 5);
}
IOCallbackCountdown callback = new IOCallbackCountdown(1);
file.internalWrite(src, callback);
JDBCSequentialFile copy = (JDBCSequentialFile) file.cloneFile();
}
private void checkData(JDBCSequentialFile file, ActiveMQBuffer expectedData) throws SQLException {
expectedData.resetReaderIndex();
byte[] resultingBytes = new byte[expectedData.readableBytes()];
ByteBuffer byteBuffer = ByteBuffer.allocate(expectedData.readableBytes());
file.read(byteBuffer, null);
expectedData.getBytes(0, resultingBytes);
assertArrayEquals(resultingBytes, byteBuffer.array());
}
private class IOCallbackCountdown implements IOCallback {
private final CountDownLatch countDownLatch;
public IOCallbackCountdown(int size) {
this.countDownLatch = new CountDownLatch(size);
}
@Override
public void done() {
countDownLatch.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
fail(errorMessage);
}
public void assertEmpty(int timeout) throws InterruptedException {
countDownLatch.await(timeout, TimeUnit.SECONDS);
assertEquals(countDownLatch.getCount(), 0);
}
}
}

View File

@ -65,18 +65,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
}
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
@Override
public boolean isSupportsCallbacks() {
return timedBuffer != null;
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
public static ByteBuffer allocateDirectByteBuffer(final int size) {
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
ByteBuffer buffer2 = null;
try {
@ -104,6 +93,21 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return buffer2;
}
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
@Override
public boolean isSupportsCallbacks() {
return timedBuffer != null;
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
}
@Override
public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC

View File

@ -25,6 +25,8 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
private String largeMessagesTableName = ActiveMQDefaultConfiguration.getDefaultLargeMessagesTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
private String jdbcDriverClassName = ActiveMQDefaultConfiguration.getDefaultDriverClassName();
@ -49,6 +51,14 @@ public class DatabaseStorageConfiguration implements StoreConfiguration {
this.bindingsTableName = bindingsTableName;
}
public String getLargeMessageTableName() {
return largeMessagesTableName;
}
public void setLargeMessageTableName(String largeMessagesTableName) {
this.largeMessagesTableName = largeMessagesTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}

View File

@ -1144,11 +1144,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode) {
NodeList databaseStoreNode = storeNode.getElementsByTagName("database-store");
DatabaseStorageConfiguration conf = new DatabaseStorageConfiguration();
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
conf.setLargeMessageTableName(getString(storeNode, "large-message-table-name", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
conf.setJdbcDriverClassName(getString(storeNode, "jdbc-driver-class-name", conf.getJdbcDriverClassName(), Validators.NO_CHECK));
return conf;

View File

@ -16,13 +16,16 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactory;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -39,14 +42,25 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
}
@Override
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
protected synchronized void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
try {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName());
bindingsJournal = localBindings;
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName(), dbConf.getJdbcDriverClassName());
bindingsJournal = localBindings;
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName());
messageJournal = localMessage;
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName(), dbConf.getJdbcDriverClassName());
messageJournal = localMessage;
bindingsJournal.start();
messageJournal.start();
largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), dbConf.getLargeMessageTableName(), dbConf.getJdbcDriverClassName(), executor);
largeMessagesFactory.start();
}
catch (Exception e) {
criticalErrorListener.onIOException(e, e.getMessage(), null);
}
}
@Override
@ -76,7 +90,9 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
((JDBCJournalImpl) bindingsJournal).stop(false);
messageJournal.stop();
((JDBCJournalImpl) messageJournal).stop(false);
largeMessagesFactory.stop();
singleThreadExecutor.shutdown();
@ -85,4 +101,12 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
started = false;
}
@Override
public ByteBuffer allocateDirectBuffer(int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
}
@Override
public void freeDirectBuffer(ByteBuffer buffer) {
}
}

View File

@ -66,13 +66,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
private SequentialFileFactory journalFF;
private SequentialFileFactory largeMessagesFactory;
SequentialFileFactory largeMessagesFactory;
private Journal originalMessageJournal;
private Journal originalBindingsJournal;
private String largeMessagesDirectory;
protected String largeMessagesDirectory;
private ReplicationManager replicator;

View File

@ -143,6 +143,7 @@ public class OperationContextImpl implements OperationContext {
}
// On this case, we can just execute the context directly
if (replicationLineUp.intValue() == replicated && storeLineUp.intValue() == stored &&
pageLineUp.intValue() == paged) {
// We want to avoid the executor if everything is complete...

View File

@ -1544,6 +1544,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="large-message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to large message files
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>

View File

@ -78,6 +78,7 @@ import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
@ -396,20 +397,13 @@ public abstract class ActiveMQTestBase extends Assert {
return createDefaultConfig(0, netty);
}
protected Configuration createDefaultJDBCConfig() throws Exception {
Configuration configuration = createDefaultConfig(true);
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGES");
dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
configuration.setStoreConfiguration(dbStorageConfiguration);
protected Configuration createDefaultJDBCConfig(boolean isNetty) throws Exception {
Configuration configuration = createDefaultConfig(isNetty);
setDBStoreType(configuration);
return configuration;
}
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
@ -448,6 +442,16 @@ public abstract class ActiveMQTestBase extends Assert {
return configuration;
}
private void setDBStoreType(Configuration configuration) {
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGES");
dbStorageConfiguration.setJdbcDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
configuration.setStoreConfiguration(dbStorageConfiguration);
}
protected Map<String, Object> generateInVMParams(final int node) {
Map<String, Object> params = new HashMap<>();
@ -1388,6 +1392,18 @@ public abstract class ActiveMQTestBase extends Assert {
return server;
}
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final long pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings,
StoreConfiguration.StoreType storeType) {
if (storeType == StoreConfiguration.StoreType.DATABASE) {
setDBStoreType(configuration);
}
return createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
}
protected final ActiveMQServer createServer(final boolean realFiles) throws Exception {
return createServer(realFiles, false);
}
@ -1404,6 +1420,11 @@ public abstract class ActiveMQTestBase extends Assert {
return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
}
protected ActiveMQServer createServer(final boolean realFiles, boolean isNetty, StoreConfiguration.StoreType storeType) throws Exception {
Configuration configuration = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty) : createDefaultConfig(isNetty);
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
}
protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
final Configuration configuration,
final NodeManager nodeManager,

View File

@ -24,6 +24,7 @@
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
<large-message-table-name>LARGE_MESSAGE_TABLE</large-message-table-name>
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
</database-store>
</store>

View File

@ -376,6 +376,7 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
<large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
<jdbc-driver-class-name>org.apache.derby.jdbc.EmbeddedDriver</jdbc-driver-class-name>
</database-store>
</store>
@ -384,13 +385,17 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
- `jdbc-connection-url`
The full JDBC connection URL for your database server. The connection url should include all configuration parameters and database name.
- `bindings-table-name`
The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `message-table-name`
The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `large-message-table-name`
The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `jdbc-driver-class-name`

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -72,6 +73,10 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
protected ServerLocator locator;
public InterruptedLargeMessageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -37,7 +38,8 @@ import org.junit.Test;
*/
public class LargeMessageAvoidLargeMessagesTest extends LargeMessageTest {
public LargeMessageAvoidLargeMessagesTest() {
public LargeMessageAvoidLargeMessagesTest(StoreConfiguration.StoreType storeType) {
super(storeType);
isCompressedTest = true;
}

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
@ -45,7 +46,8 @@ import org.junit.Test;
public class LargeMessageCompressTest extends LargeMessageTest {
// Constructors --------------------------------------------------
public LargeMessageCompressTest() {
public LargeMessageCompressTest(StoreConfiguration.StoreType storeType) {
super(storeType);
isCompressedTest = true;
}

View File

@ -29,17 +29,17 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
@ -48,33 +48,35 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class LargeMessageTest extends LargeMessageTestBase {
// Constants -----------------------------------------------------
static final int RECEIVE_WAIT_TIME = 10000;
private final int LARGE_MESSAGE_SIZE = 20 * 1024;
private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected ServerLocator locator;
protected boolean isCompressedTest = false;
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
private int largeMessageSize;
protected boolean isNetty() {
return false;
}
public LargeMessageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
// The JDBC Large Message store is pretty slow, to speed tests up we only test 5MB large messages
largeMessageSize = (storeType == StoreConfiguration.StoreType.DATABASE) ? 5 * 1024 : 100 * 1024;
}
@Test
public void testRollbackPartiallyConsumedBuffer() throws Exception {
for (int i = 0; i < 1; i++) {
@ -82,9 +84,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
internalTestRollbackPartiallyConsumedBuffer(false);
tearDown();
setUp();
}
}
@Test
@ -93,11 +93,12 @@ public class LargeMessageTest extends LargeMessageTestBase {
}
private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception {
final int messageSize = 100 * 1024;
final int messageSize = largeMessageSize;
final ClientSession session;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
AddressSettings settings = new AddressSettings();
if (redeliveryDelay) {
@ -184,7 +185,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -229,7 +230,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
public void testDeleteOnNoBinding() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -277,7 +278,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>(), storeType);
server.start();
@ -338,7 +339,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
Configuration config = createDefaultConfig(isNetty()).setJournalFileSize(journalsize).setJournalBufferSize_AIO(10 * 1024).setJournalBufferSize_NIO(10 * 1024);
Configuration config = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty()) : createDefaultConfig(isNetty());
config.setJournalFileSize(journalsize).setJournalBufferSize_AIO(10 * 1024).setJournalBufferSize_NIO(10 * 1024);
ActiveMQServer server = createServer(true, config);
@ -396,7 +398,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -445,7 +447,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.close();
server.stop();
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -469,7 +471,9 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.commit();
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), isCompressedTest ? 0 : 1);
if (storeType != StoreConfiguration.StoreType.DATABASE) {
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), isCompressedTest ? 0 : 1);
}
consumer = session.createConsumer(ADDRESS.concat("-2"));
@ -487,7 +491,9 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.close();
validateNoFilesOnLargeDir();
if (storeType != StoreConfiguration.StoreType.DATABASE) {
validateNoFilesOnLargeDir();
}
}
@Test
@ -496,7 +502,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -557,7 +563,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -665,7 +671,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -734,7 +740,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.close();
server.stop();
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -772,7 +778,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
try {
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -819,7 +825,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.close();
server.stop();
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -872,7 +878,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
try {
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -944,7 +950,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
try {
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -1019,7 +1025,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
ClientSession session = null;
try {
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -1090,12 +1096,12 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testFilePersistenceOneHugeMessage() throws Exception {
testChunks(false, false, false, true, true, false, false, false, false, 1, 100 * 1024L * 1024L, LargeMessageTest.RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
testChunks(false, false, false, true, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
}
@Test
public void testFilePersistenceOneMessageStreaming() throws Exception {
testChunks(false, false, false, true, true, false, false, false, false, 1, 100 * 1024L * 1024L, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, false, true, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
@ -1105,22 +1111,22 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testFilePersistenceOneHugeMessageConsumer() throws Exception {
testChunks(false, false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
testChunks(false, false, false, true, true, false, false, false, true, 1, largeMessageSize, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
}
@Test
public void testFilePersistence() throws Exception {
testChunks(false, false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, true, false, false, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, true, true, 2, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, true, false, false, true, true, 2, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceXA() throws Exception {
testChunks(true, false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, true, false, false, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
@ -1135,122 +1141,122 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testFilePersistenceXAConsumer() throws Exception {
testChunks(true, false, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, true, false, false, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceXAConsumerRestart() throws Exception {
testChunks(true, true, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, true, true, false, true, false, false, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlocked() throws Exception {
testChunks(false, false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, true, false, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedConsumer() throws Exception {
testChunks(false, false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, true, false, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedXA() throws Exception {
testChunks(true, false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, true, false, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedXAConsumer() throws Exception {
testChunks(true, false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, true, false, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedPreACK() throws Exception {
testChunks(false, false, true, false, true, true, true, true, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, true, true, true, true, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedPreACKConsumer() throws Exception {
testChunks(false, false, true, false, true, true, true, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, true, true, true, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedPreACKXA() throws Exception {
testChunks(true, false, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, true, true, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedPreACKXARestart() throws Exception {
testChunks(true, true, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, true, true, false, true, true, true, true, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedPreACKXAConsumer() throws Exception {
testChunks(true, false, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, true, true, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceBlockedPreACKXAConsumerRestart() throws Exception {
testChunks(true, true, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, true, true, false, true, true, true, true, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testFilePersistenceDelayed() throws Exception {
testChunks(false, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(false, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
}
@Test
public void testFilePersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(false, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
}
@Test
public void testFilePersistenceDelayedXA() throws Exception {
testChunks(true, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(true, false, true, false, true, false, false, false, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
}
@Test
public void testFilePersistenceDelayedXAConsumer() throws Exception {
testChunks(true, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
testChunks(true, false, true, false, true, false, false, false, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 2000);
}
@Test
public void testNullPersistence() throws Exception {
testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testNullPersistenceConsumer() throws Exception {
testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(false, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testNullPersistenceXA() throws Exception {
testChunks(true, false, true, false, false, false, false, true, false, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, false, false, false, true, false, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testNullPersistenceXAConsumer() throws Exception {
testChunks(true, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
testChunks(true, false, true, false, false, false, false, true, true, 1, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 0);
}
@Test
public void testNullPersistenceDelayed() throws Exception {
testChunks(false, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
testChunks(false, false, true, false, false, false, false, false, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testNullPersistenceDelayedConsumer() throws Exception {
testChunks(false, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
testChunks(false, false, true, false, false, false, false, false, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testNullPersistenceDelayedXA() throws Exception {
testChunks(true, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
testChunks(true, false, true, false, false, false, false, false, false, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
public void testNullPersistenceDelayedXAConsumer() throws Exception {
testChunks(true, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
testChunks(true, false, true, false, false, false, false, false, true, 100, largeMessageSize, LargeMessageTest.RECEIVE_WAIT_TIME, 100);
}
@Test
@ -1343,7 +1349,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -1410,7 +1416,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
public void testTwoBindings(final boolean restart) throws Exception {
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -1439,7 +1445,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
server.stop();
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -1478,7 +1484,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
private void internalTestSendRollback(final boolean isXA, final boolean durable) throws Exception {
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -1536,7 +1542,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
public void simpleRollbackInternalTest(final boolean isXA) throws Exception {
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
ActiveMQServer server = createServer(true, isNetty());
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -1644,7 +1650,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
final int NUMBER_OF_MESSAGES = 30;
try {
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -1730,7 +1736,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
final int NUMBER_OF_MESSAGES = 1000;
try {
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -1821,7 +1827,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
AddressSettings value = new AddressSettings();
map.put(ADDRESS.toString(), value);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
server.start();
final int numberOfBytes = 1024;
@ -1871,7 +1877,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
server.stop();
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
server.start();
sf = createSessionFactory(locator);
@ -1948,7 +1954,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
AddressSettings value = new AddressSettings();
map.put(ADDRESS.toString(), value);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
server.start();
final int numberOfBytes = 1024;
@ -2048,11 +2054,11 @@ public class LargeMessageTest extends LargeMessageTestBase {
final int SIZE = 10 * 1024 * 1024;
try {
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
locator.setMinLargeMessageSize(100 * 1024);
locator.setMinLargeMessageSize(largeMessageSize * 1024);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
@ -2081,7 +2087,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
msg2.acknowledge();
msg2.setOutputStream(createFakeOutputStream());
Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
Assert.assertTrue(msg2.waitOutputStreamCompletion(0));
session.commit();
@ -2089,6 +2095,10 @@ public class LargeMessageTest extends LargeMessageTestBase {
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
}
catch (Throwable t) {
t.printStackTrace();
throw t;
}
finally {
try {
session.close();
@ -2115,11 +2125,11 @@ public class LargeMessageTest extends LargeMessageTestBase {
final int SIZE = 0;
try {
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
locator.setMinLargeMessageSize(100 * 1024);
locator.setMinLargeMessageSize(largeMessageSize * 1024);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
@ -2184,11 +2194,11 @@ public class LargeMessageTest extends LargeMessageTestBase {
final int SIZE = 0;
try {
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
locator.setMinLargeMessageSize(100 * 1024);
locator.setMinLargeMessageSize(largeMessageSize * 1024);
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
@ -2265,7 +2275,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
final int SIZE = 10 * 1024;
final int NUMBER_OF_MESSAGES = 1;
server = createServer(true, isNetty());
server = createServer(true, isNetty(), storeType);
server.start();
@ -2320,7 +2330,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The ClientConsumer should be able to also send ServerLargeMessages as that's done by the CoreBridge
@Test
public void testSendServerMessage() throws Exception {
ActiveMQServer server = createServer(true);
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
@ -2332,12 +2342,12 @@ public class LargeMessageTest extends LargeMessageTestBase {
fileMessage.setMessageID(1005);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, LARGE_MESSAGE_SIZE);
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources();
@ -2359,9 +2369,9 @@ public class LargeMessageTest extends LargeMessageTestBase {
Assert.assertNotNull(msg);
Assert.assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
Assert.assertEquals(msg.getBodySize(), largeMessageSize);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
for (int i = 0; i < largeMessageSize; i++) {
Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), msg.getBodyBuffer().readByte());
}
@ -2379,6 +2389,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
public void setUp() throws Exception {
super.setUp();
locator = createFactory(isNetty());
locator.setCallTimeout(100000000);
}
protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception {
@ -2392,7 +2403,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
AddressSettings value = new AddressSettings();
map.put(ADDRESS.toString(), value);
ActiveMQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
ActiveMQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map, storeType);
server.start();
final int numberOfBytes = 1024;
@ -2435,7 +2446,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
if (realFiles) {
server.stop();
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map, storeType);
server.start();
sf = createSessionFactory(locator);

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
import org.junit.After;
import org.junit.Before;
@ -33,7 +34,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class JDBCJournalTest {
public class JDBCJournalTest extends ActiveMQTestBase {
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();

View File

@ -28,6 +28,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
@ -35,18 +37,23 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.DeflaterReader;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@RunWith(Parameterized.class)
public abstract class LargeMessageTestBase extends ActiveMQTestBase {
// Constants -----------------------------------------------------
@ -66,6 +73,20 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
// Protected -----------------------------------------------------
protected StoreConfiguration.StoreType storeType;
public LargeMessageTestBase(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType={0}")
public static Collection<Object[]> data() {
// Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.DATABASE}};
//Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}};
return Arrays.asList(params);
}
protected void testChunks(final boolean isXA,
final boolean restartOnXA,
final boolean rollbackFirstSend,
@ -99,7 +120,15 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
final int minSize) throws Exception {
clearDataRecreateServerDirs();
ActiveMQServer server = createServer(realFiles);
Configuration configuration;
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig(true);
}
else {
configuration = createDefaultConfig(false);
}
ActiveMQServer server = createServer(realFiles, configuration);
server.start();
ServerLocator locator = createInVMNonHALocator();
@ -200,7 +229,7 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
if (realFiles) {
server.stop();
server = createServer(realFiles);
server = createServer(realFiles, configuration);
server.start();
sf = locator.createSessionFactory();
@ -352,13 +381,14 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
@Override
public void write(final byte[] b) throws IOException {
if (b[0] == ActiveMQTestBase.getSamplebyte(bytesRead.get())) {
bytesRead.addAndGet(b.length);
if (b.length > 0) {
if (b[0] == ActiveMQTestBase.getSamplebyte(bytesRead.get())) {
bytesRead.addAndGet(b.length);
}
else {
LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
}
}
else {
LargeMessageTestBase.log.warn("Received invalid packet at position " + bytesRead.get());
}
}
@Override
@ -426,12 +456,17 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
validateNoFilesOnLargeDir();
}
catch (Throwable e) {
e.printStackTrace();
throw e;
}
finally {
locator.close();
try {
server.stop();
}
catch (Throwable ignored) {
ignored.printStackTrace();
}
}
}
@ -442,7 +477,7 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
* @param delayDelivery
* @param session
* @param producer
* @throws FileNotFoundException
* @throws Exception
* @throws IOException
* @throws ActiveMQException
*/
@ -523,7 +558,6 @@ public abstract class LargeMessageTestBase extends ActiveMQTestBase {
* @param queueToRead
* @param numberOfBytes
* @throws ActiveMQException
* @throws FileNotFoundException
* @throws IOException
*/
protected void readMessage(final ClientSession session,

View File

@ -125,7 +125,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
protected void createStorage() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) {
journal = createJDBCJournalStorageManager(createDefaultJDBCConfig());
journal = createJDBCJournalStorageManager(createDefaultJDBCConfig(true));
}
else {
journal = createJournalStorageManager(createDefaultInVMConfig());

View File

@ -101,7 +101,7 @@ public class BasicXaRecoveryTest extends ActiveMQTestBase {
addressSettings.clear();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig().setJMXManagementEnabled(true);
configuration = createDefaultJDBCConfig(true).setJMXManagementEnabled(true);
}
else {
configuration = createDefaultInVMConfig().setJMXManagementEnabled(true);

View File

@ -89,7 +89,7 @@ public class BasicXaTest extends ActiveMQTestBase {
addressSettings.clear();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig();
configuration = createDefaultJDBCConfig(true);
}
else {
configuration = createDefaultNettyConfig();

View File

@ -99,7 +99,7 @@ public class XaTimeoutTest extends ActiveMQTestBase {
addressSettings.clear();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig();
configuration = createDefaultJDBCConfig(true);
}
else {
configuration = createBasicConfig();

View File

@ -16,11 +16,16 @@
*/
package org.apache.activemq.artemis.tests.stress.chunk;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.junit.Test;
public class LargeMessageStressTest extends LargeMessageTestBase {
public LargeMessageStressTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------