This closes #295

This commit is contained in:
Clebert Suconic 2016-01-13 09:49:25 -05:00
commit a8c4ebd6a4
76 changed files with 5906 additions and 3201 deletions

1
.gitignore vendored
View File

@ -9,6 +9,7 @@ ratReport.txt
.settings
.checkstyle
.factorypath
**/derby.log
# for native build
CMakeCache.txt

View File

@ -47,7 +47,8 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -235,7 +236,7 @@ public class PrintData extends LockAbstract {
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) {
JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
@ -248,7 +249,7 @@ public class PrintData extends LockAbstract {
set.add(encoding.position);
}
else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE) {
JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
encoding.decode(buff);
Long queueID = Long.valueOf(encoding.queueID);
@ -260,7 +261,7 @@ public class PrintData extends LockAbstract {
}
else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION) {
if (record.isUpdate) {
JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
pageUpdate.decode(buff);
cursorInfo.getPgTXs().add(pageUpdate.pageTX);

View File

@ -71,10 +71,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;

View File

@ -396,6 +396,18 @@ public final class ActiveMQDefaultConfiguration {
// How often the reaper will be run to check for timed out group bindings. Only valid for LOCAL handlers
private static long DEFAULT_GROUPING_HANDLER_REAPER_PERIOD = 30000;
// Which store type to use, options are FILE or DATABASE, FILE is default.
private static String DEFAULT_STORE_TYPE = "FILE";
// Default database url. Derby database is used by default.
private static String DEFAULT_DATABASE_URL = "jdbc:derby:data/derby;create=true";
// Default message table name, used with Database storage type
private static String DEFAULT_MESSAGE_TABLE_NAME = "MESSAGES";
// Default bindings table name, used with Database storage type
private static String DEFAULT_BINDINGS_TABLE_NAME = "BINDINGS";
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1052,4 +1064,28 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_GROUPING_HANDLER_REAPER_PERIOD;
}
/**
* The default storage type. Options are FILE and DATABASE.
*/
public static String getDefaultStoreType() {
return DEFAULT_STORE_TYPE;
}
/**
* The default database URL, used with DATABASE store type.
*/
public static String getDefaultDatabaseUrl() {
return DEFAULT_DATABASE_URL;
}
/**
* The default Message Journal table name, used with DATABASE store.
*/
public static String getDefaultMessageTableName() {
return DEFAULT_MESSAGE_TABLE_NAME;
}
public static String getDefaultBindingsTableName() {
return DEFAULT_BINDINGS_TABLE_NAME;
}
}

View File

@ -132,6 +132,11 @@
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-website</artifactId>

View File

@ -54,6 +54,7 @@
<include>org.apache.activemq:artemis-jms-client</include>
<include>org.apache.activemq:artemis-jms-server</include>
<include>org.apache.activemq:artemis-journal</include>
<include>org.apache.activemq:artemis-jdbc-store</include>
<include>org.apache.activemq:artemis-native</include>
<include>org.apache.activemq:artemis-amqp-protocol</include>
<include>org.apache.activemq:artemis-openwire-protocol</include>
@ -93,6 +94,7 @@
<include>org.fusesource.hawtbuf:hawtbuf</include>
<include>org.jgroups:jgroups</include>
<include>io.netty:netty-codec-mqtt</include>
<include>org.apache.derby:derby</include>
</includes>
<!--excludes>
<exclude>org.apache.activemq:artemis-website</exclude>

View File

@ -0,0 +1,74 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>1.2.1-SNAPSHOT</version>
</parent>
<artifactId>artemis-jdbc-store</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JDBC Store</name>
<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,607 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.derby.jdbc.AutoloadedDriver;
public class JDBCJournalImpl implements Journal {
// Sync Delay in ms
public static final int SYNC_DELAY = 500;
private static int USER_VERSION = 1;
private final String tableName;
private Connection connection;
private List<JDBCJournalRecord> records;
private PreparedStatement insertJournalRecords;
private PreparedStatement selectJournalRecords;
private PreparedStatement countJournalRecords;
private PreparedStatement deleteJournalRecords;
private PreparedStatement deleteTxJournalRecords;
private boolean started;
private String jdbcUrl;
private Timer syncTimer;
private Driver dbDriver;
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private boolean isDerby = false;
public JDBCJournalImpl(String jdbcUrl, String tableName) {
this.tableName = tableName;
this.jdbcUrl = jdbcUrl;
records = new ArrayList<JDBCJournalRecord>();
}
@Override
public void start() throws Exception {
// Load Database driver, sets Derby Autoloaded Driver as lowest priority.
List<Driver> drivers = Collections.list(DriverManager.getDrivers());
if (drivers.size() <= 2 && drivers.size() > 0) {
dbDriver = drivers.get(0);
isDerby = dbDriver instanceof AutoloadedDriver;
if (drivers.size() > 1 && isDerby) {
dbDriver = drivers.get(1);
}
if (isDerby) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception e) {
}
}
});
}
}
else {
String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use";
throw new RuntimeException(error);
}
connection = dbDriver.connect(jdbcUrl, new Properties());
// If JOURNAL table doesn't exist then create it
ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null);
if (!rs.next()) {
Statement statement = connection.createStatement();
statement.executeUpdate(JDBCJournalRecord.createTableSQL(tableName));
}
insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
deleteTxJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteTxRecordsSQL(tableName));
syncTimer = new Timer();
syncTimer.scheduleAtFixedRate(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
started = true;
}
@Override
public void stop() throws Exception {
stop(true);
}
public synchronized void stop(boolean shutdownConnection) throws Exception {
if (started) {
syncTimer.cancel();
sync();
if (shutdownConnection) {
connection.close();
}
started = false;
}
}
public synchronized void destroy() throws Exception {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE " + tableName);
connection.commit();
stop();
}
public int sync() throws SQLException {
List<JDBCJournalRecord> recordRef = records;
records = new ArrayList<JDBCJournalRecord>();
for (JDBCJournalRecord record : recordRef) {
record.storeLineUp();
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
record.writeDeleteTxRecord(deleteTxJournalRecords);
break;
default:
record.writeRecord(insertJournalRecords);
break;
}
}
boolean success = false;
try {
connection.setAutoCommit(false);
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteTxJournalRecords.executeBatch();
connection.commit();
success = true;
}
catch (SQLException e) {
connection.rollback();
e.printStackTrace();
}
executeCallbacks(recordRef, success);
return recordRef.size();
}
// TODO Use an executor.
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
Runnable r = new Runnable() {
@Override
public void run() {
for (JDBCJournalRecord record : records) {
record.complete(result);
}
}
};
Thread t = new Thread(r);
t.start();
}
private void appendRecord(JDBCJournalRecord record) throws SQLException {
try {
journalLock.writeLock().lock();
records.add(record);
}
finally {
journalLock.writeLock().unlock();
}
}
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendAddRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
}
@Override
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
}
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID,
boolean sync,
IOCompletion callback,
boolean lineUpContext) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
r.setStoreLineUp(lineUpContext);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendPrepareRecord(long txID,
EncodingSupport transactionData,
boolean sync,
IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendRollbackRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
r.setTxId(txID);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setSync(sync);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
JournalLoadInformation jli = new JournalLoadInformation();
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
JDBCJournalRecord r;
try (ResultSet rs = selectJournalRecords.executeQuery()) {
int noRecords = 0;
while (rs.next()) {
r = JDBCJournalRecord.readRecord(rs);
switch (r.getRecordType()) {
case JDBCJournalRecord.ADD_RECORD:
jrc.onReadAddRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD:
jrc.onReadUpdateRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD:
jrc.onReadDeleteRecord(r.getId());
break;
case JDBCJournalRecord.ADD_RECORD_TX:
jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD_TX:
jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.PREPARE_RECORD:
jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.COMMIT_RECORD:
jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
jrc.onReadRollbackRecord(r.getTxId());
break;
default:
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
}
noRecords++;
}
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords);
}
return jli;
}
@Override
public JournalLoadInformation loadInternalOnly() throws Exception {
return null;
}
@Override
public JournalLoadInformation loadSyncOnly(JournalState state) throws Exception {
return null;
}
@Override
public void lineUpContext(IOCompletion callback) {
callback.storeLineUp();
}
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) throws Exception {
JDBCJournalLoaderCallback lc = new JDBCJournalLoaderCallback(committedRecords, preparedTransactions, failureCallback, fixBadTX);
return load(lc);
}
@Override
public int getAlignment() throws Exception {
return 0;
}
@Override
public int getNumberOfRecords() {
int count = 0;
try (ResultSet rs = countJournalRecords.executeQuery()) {
rs.next();
count = rs.getInt(1);
}
catch (SQLException e) {
return -1;
}
return count;
}
@Override
public int getUserVersion() {
return USER_VERSION;
}
@Override
public void perfBlast(int pages) {
}
@Override
public void runDirectJournalBlast() throws Exception {
}
@Override
public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception {
return null;
}
public final void synchronizationLock() {
journalLock.writeLock().lock();
}
public final void synchronizationUnlock() {
journalLock.writeLock().unlock();
}
@Override
public void forceMoveNextFile() throws Exception {
}
@Override
public JournalFile[] getDataFiles() {
return new JournalFile[0];
}
@Override
public SequentialFileFactory getFileFactory() {
return null;
}
@Override
public int getFileSize() {
return 0;
}
@Override
public void scheduleCompactAndBlock(int timeout) throws Exception {
}
@Override
public void replicationSyncPreserveOldFiles() {
}
@Override
public void replicationSyncFinished() {
}
@Override
public boolean isStarted() {
return started;
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
public class JDBCJournalLoaderCallback implements LoaderCallback {
private static final int DELETE_FLUSH = 20000;
private final List<PreparedTransactionInfo> preparedTransactions;
private final TransactionFailureCallback failureCallback;
private final boolean fixBadTX;
/* We keep track of list entries for each ID. This preserves order and allows multiple record insertions with the
same ID. We use this for deleting records */
private final Map<Long, List<Integer>> deleteReferences = new HashMap<Long, List<Integer>>();
private Runtime runtime = Runtime.getRuntime();
private final List<RecordInfo> committedRecords;
private long maxId = -1;
public JDBCJournalLoaderCallback(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) {
this.committedRecords = committedRecords;
this.preparedTransactions = preparedTransactions;
this.failureCallback = failureCallback;
this.fixBadTX = fixBadTX;
}
public synchronized void checkMaxId(long id) {
if (maxId < id) {
maxId = id;
}
}
public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) {
preparedTransactions.add(preparedTransaction);
}
public synchronized void addRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
ArrayList<Integer> indexes = new ArrayList<Integer>();
indexes.add(index);
deleteReferences.put(info.id, indexes);
checkMaxId(info.id);
}
public synchronized void updateRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
deleteReferences.get(info.id).add(index);
}
public synchronized void deleteRecord(final long id) {
for (Integer i : deleteReferences.get(id)) {
committedRecords.remove(i);
}
}
public int getNoRecords() {
return committedRecords.size();
}
@Override
public void failedTransaction(final long transactionID,
final List<RecordInfo> records,
final List<RecordInfo> recordsToDelete) {
if (failureCallback != null) {
failureCallback.failedTransaction(transactionID, records, recordsToDelete);
}
}
public long getMaxId() {
return maxId;
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalTransaction;
public class JDBCJournalReaderCallback implements JournalReaderCallback {
private final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
private final LoaderCallback loadManager;
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
public JDBCJournalReaderCallback(LoaderCallback loadManager) {
this.loadManager = loadManager;
}
public void onReadAddRecord(final RecordInfo info) throws Exception {
loadManager.addRecord(info);
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
loadManager.updateRecord(info);
}
public void onReadDeleteRecord(final long recordID) throws Exception {
loadManager.deleteRecord(recordID);
}
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
onReadAddRecordTX(transactionID, info);
}
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordInfos.add(info);
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordsToDelete.add(info);
}
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.prepared = true;
tx.extraData = extraData;
}
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
if (tx != null) {
// JournalTransaction journalTransaction = transactions.remove(transactionID);
// if (journalTransaction == null)
// {
// throw new IllegalStateException("Cannot Commit, tx not found with ID: " + transactionID);
// }
for (RecordInfo txRecord : tx.recordInfos) {
if (txRecord.isUpdate) {
loadManager.updateRecord(txRecord);
}
else {
loadManager.addRecord(txRecord);
}
}
for (RecordInfo deleteValue : tx.recordsToDelete) {
loadManager.deleteRecord(deleteValue.id);
}
}
}
public void onReadRollbackRecord(final long transactionID) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
if (tx == null) {
throw new IllegalStateException("Cannot rollback, tx not found with ID: " + transactionID);
}
}
@Override
public void markAsDataFile(JournalFile file) {
// Not needed for JDBC journal impl
}
}

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
public class JDBCJournalRecord {
/*
Database Table Schema:
id BIGINT (long)
recordType SMALLINT (byte)
compactCount SMALLINT (byte)
txId BIGINT (long)
userRecordType SMALLINT (byte)
variableSize INT (int)
record BLOB (InputStream)
txDataSize INT (int)
txData BLOB (InputStream)
txCheckNoRecords INT (int)
*/
// Record types taken from Journal Impl
public static final byte ADD_RECORD = 11;
public static final byte UPDATE_RECORD = 12;
public static final byte ADD_RECORD_TX = 13;
public static final byte UPDATE_RECORD_TX = 14;
public static final byte DELETE_RECORD_TX = 15;
public static final byte DELETE_RECORD = 16;
public static final byte PREPARE_RECORD = 17;
public static final byte COMMIT_RECORD = 18;
public static final byte ROLLBACK_RECORD = 19;
// Callback and sync operations
private IOCompletion ioCompletion = null;
private boolean storeLineUp = false;
private boolean sync = false;
// DB Fields for all records
private Long id;
private byte recordType;
private byte compactCount;
private long txId;
// DB fields for ADD_RECORD(TX), UPDATE_RECORD(TX),
private int variableSize;
protected byte userRecordType;
private InputStream record;
// DB Fields for PREPARE_RECORD
private int txDataSize;
private InputStream txData;
// DB Fields for COMMIT_RECORD and PREPARE_RECORD
private int txCheckNoRecords;
private boolean isUpdate;
public JDBCJournalRecord(long id, byte recordType) {
this.id = id;
this.recordType = recordType;
this.isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX;
// set defaults
compactCount = 0;
txId = 0;
variableSize = 0;
userRecordType = -1;
record = new ByteArrayInputStream(new byte[0]);
txDataSize = 0;
txData = new ByteArrayInputStream(new byte[0]);
txCheckNoRecords = 0;
}
public static String createTableSQL(String tableName) {
return "CREATE TABLE " + tableName + "(id BIGINT, " + "recordType SMALLINT, " + "compactCount SMALLINT, " + "txId BIGINT, " + "userRecordType SMALLINT, " + "variableSize INTEGER, " + "record BLOB, " + "txDataSize INTEGER, " + "txData BLOB, " + "txCheckNoRecords INTEGER)";
}
public static String insertRecordsSQL(String tableName) {
return "INSERT INTO " + tableName + "(id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords) " + "VALUES (?,?,?,?,?,?,?,?,?,?)";
}
public static String selectRecordsSQL(String tableName) {
return "SELECT id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords " + "FROM " + tableName;
}
public static String deleteRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE id = ?";
}
public static String deleteTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId = ?";
}
public void complete(boolean success) {
if (ioCompletion != null) {
if (success) {
ioCompletion.done();
}
else {
ioCompletion.onError(1, "DATABASE INSERT FAILED");
}
}
}
public void storeLineUp() {
if (storeLineUp && ioCompletion != null) {
ioCompletion.storeLineUp();
}
}
protected void writeRecord(PreparedStatement statement) throws SQLException {
statement.setLong(1, id);
statement.setByte(2, recordType);
statement.setByte(3, compactCount);
statement.setLong(4, txId);
statement.setByte(5, userRecordType);
statement.setInt(6, variableSize);
statement.setBlob(7, record);
statement.setInt(8, txDataSize);
statement.setBlob(9, txData);
statement.setInt(10, txCheckNoRecords);
statement.addBatch();
}
protected void writeDeleteTxRecord(PreparedStatement deleteTxStatement) throws SQLException {
deleteTxStatement.setLong(1, txId);
deleteTxStatement.addBatch();
}
protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
deleteStatement.setLong(1, id);
deleteStatement.addBatch();
}
public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2));
record.setCompactCount((byte) rs.getShort(3));
record.setTxId(rs.getLong(4));
record.setUserRecordType((byte) rs.getShort(5));
record.setVariableSize(rs.getInt(6));
record.setRecord(rs.getBytes(7));
record.setTxDataSize(rs.getInt(8));
record.setTxData(rs.getBytes(9));
record.setTxCheckNoRecords(rs.getInt(10));
return record;
}
public IOCompletion getIoCompletion() {
return ioCompletion;
}
public void setIoCompletion(IOCompletion ioCompletion) {
this.ioCompletion = ioCompletion;
}
public boolean isStoreLineUp() {
return storeLineUp;
}
public void setStoreLineUp(boolean storeLineUp) {
this.storeLineUp = storeLineUp;
}
public boolean isSync() {
return sync;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public Long getId() {
return id;
}
public byte getRecordType() {
return recordType;
}
public byte getCompactCount() {
return compactCount;
}
public void setCompactCount(byte compactCount) {
this.compactCount = compactCount;
}
public long getTxId() {
return txId;
}
public void setTxId(long txId) {
this.txId = txId;
}
public int getVariableSize() {
return variableSize;
}
public void setVariableSize(int variableSize) {
this.variableSize = variableSize;
}
public byte getUserRecordType() {
return userRecordType;
}
public void setUserRecordType(byte userRecordType) {
this.userRecordType = userRecordType;
}
public void setRecord(byte[] record) {
this.variableSize = record.length;
this.record = new ByteArrayInputStream(record);
}
public void setRecord(InputStream record) {
this.record = record;
}
public void setRecord(EncodingSupport record) {
this.variableSize = record.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
record.encode(encodedBuffer);
this.record = new ActiveMQBufferInputStream(encodedBuffer);
}
public InputStream getRecord() {
return record;
}
public int getTxCheckNoRecords() {
return txCheckNoRecords;
}
public void setTxCheckNoRecords(int txCheckNoRecords) {
this.txCheckNoRecords = txCheckNoRecords;
}
public void setTxDataSize(int txDataSize) {
this.txDataSize = txDataSize;
}
public int getTxDataSize() {
return txDataSize;
}
public InputStream getTxData() {
return txData;
}
public void setTxData(InputStream record) {
this.record = record;
}
public void setTxData(EncodingSupport txData) {
this.txDataSize = txData.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
txData.encode(encodedBuffer);
this.txData = new ActiveMQBufferInputStream(encodedBuffer);
}
public void setTxData(byte[] txData) {
this.txDataSize = txData.length;
this.txData = new ByteArrayInputStream(txData);
}
public boolean isUpdate() {
return isUpdate;
}
public byte[] getRecordData() throws IOException {
byte[] data = new byte[variableSize];
record.read(data);
return data;
}
public byte[] getTxDataAsByteArray() throws IOException {
byte[] data = new byte[txDataSize];
txData.read(data);
return data;
}
public RecordInfo toRecordInfo() throws IOException {
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.SQLException;
import java.util.TimerTask;
public class JDBCJournalSync extends TimerTask {
private final JDBCJournalImpl journal;
public JDBCJournalSync(JDBCJournalImpl journal) {
this.journal = journal;
}
@Override
public void run() {
try {
journal.sync();
}
catch (SQLException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.core.journal.RecordInfo;
final class TransactionHolder {
public TransactionHolder(final long id) {
transactionID = id;
}
public final long transactionID;
public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public boolean prepared;
public boolean invalid;
public byte[] extraData;
}

View File

@ -21,17 +21,33 @@ import java.util.List;
public class PreparedTransactionInfo {
public final long id;
private final long id;
public final byte[] extraData;
private final byte[] extraData;
public final List<RecordInfo> records = new ArrayList<>();
private final List<RecordInfo> records = new ArrayList<RecordInfo>();
public final List<RecordInfo> recordsToDelete = new ArrayList<>();
private final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public PreparedTransactionInfo(final long id, final byte[] extraData) {
this.id = id;
this.extraData = extraData;
}
public long getId() {
return id;
}
public byte[] getExtraData() {
return extraData;
}
public List<RecordInfo> getRecords() {
return records;
}
public List<RecordInfo> getRecordsToDelete() {
return recordsToDelete;
}
}

View File

@ -1786,9 +1786,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.records.addAll(transaction.recordInfos);
info.getRecords().addAll(transaction.recordInfos);
info.recordsToDelete.addAll(transaction.recordsToDelete);
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
loadManager.addPreparedTransaction(info);
}

View File

@ -22,13 +22,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
public class JournalAddRecord extends JournalInternalRecord {
private final long id;
protected final long id;
private final EncodingSupport record;
protected final EncodingSupport record;
private final byte recordType;
protected final byte recordType;
private final boolean add;
protected final boolean add;
/**
* @param id

View File

@ -66,6 +66,11 @@
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>

View File

@ -271,8 +271,9 @@ public interface Configuration {
/**
* Add an acceptor to the config
*
* @param name the name of the acceptor
* @param uri the URI of the acceptor
* @param uri the URI of the acceptor
* @return this
* @throws Exception in case of Parsing errors on the URI
*/
@ -935,4 +936,7 @@ public interface Configuration {
*/
File getBrokerInstance();
StoreConfiguration getStoreConfiguration();
Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
public interface StoreConfiguration extends Serializable {
public enum StoreType {
FILE,
DATABASE
}
StoreType getStoreType();
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.security.Role;
@ -228,6 +229,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private HAPolicyConfiguration haPolicyConfiguration;
private StoreConfiguration storeConfiguration;
/**
* Parent folder for all data folders.
*/
@ -407,7 +410,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public ConfigurationImpl addConnectorConfiguration(final String name, final String uri) throws Exception {
@ -422,7 +424,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public ConfigurationImpl clearConnectorConfigurations() {
connectorConfigs.clear();
@ -1278,6 +1279,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return resolveProtocols;
}
@Override
public StoreConfiguration getStoreConfiguration() {
return storeConfiguration;
}
@Override
public ConfigurationImpl setStoreConfiguration(StoreConfiguration storeConfiguration) {
this.storeConfiguration = storeConfiguration;
return this;
}
@Override
public int hashCode() {
final int prime = 31;

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.storage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
public class DatabaseStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
}
public String getMessageTableName() {
return messageTableName;
}
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
}
public String getBindingsTableName() {
return bindingsTableName;
}
public void setBindingsTableName(String bindingsTableName) {
this.bindingsTableName = bindingsTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getJdbcConnectionUrl() {
return jdbcConnectionUrl;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.storage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
public class FileStorageConfiguration implements StoreConfiguration {
private String messageTableName = ActiveMQDefaultConfiguration.getDefaultMessageTableName();
private String bindingsTableName = ActiveMQDefaultConfiguration.getDefaultBindingsTableName();
private String jdbcConnectionUrl = ActiveMQDefaultConfiguration.getDefaultDatabaseUrl();
@Override
public StoreType getStoreType() {
return StoreType.DATABASE;
}
public String getMessageTableName() {
return messageTableName;
}
public void setMessageTableName(String messageTableName) {
this.messageTableName = messageTableName;
}
public String getBindingsTableName() {
return bindingsTableName;
}
public void setBindingsTableName(String bindingsTableName) {
this.bindingsTableName = bindingsTableName;
}
public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
this.jdbcConnectionUrl = jdbcConnectionUrl;
}
public String getJdbcConnectionUrl() {
return jdbcConnectionUrl;
}
}

View File

@ -54,6 +54,8 @@ import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfigu
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.Validators;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.config.storage.FileStorageConfiguration;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -214,6 +216,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration());
}
NodeList storeTypeNodes = e.getElementsByTagName("store");
if (storeTypeNodes.getLength() > 0) {
parseStoreConfiguration((Element) storeTypeNodes.item(0), config);
}
config.setResolveProtocols(getBoolean(e, "resolve-protocols", config.isResolveProtocols()));
config.setPersistenceEnabled(getBoolean(e, "persistence-enabled", config.isPersistenceEnabled()));
@ -666,7 +674,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
return securityMatch;
}
private Pair<SecuritySettingPlugin,Map<String,String>> parseSecuritySettingPlugins(Node item) {
private Pair<SecuritySettingPlugin, Map<String, String>> parseSecuritySettingPlugins(Node item) {
final String clazz = item.getAttributes().getNamedItem("class-name").getNodeValue();
final Map<String, String> settings = new HashMap<>();
NodeList children = item.getChildNodes();
@ -905,6 +913,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
HA_LIST.add("replication");
}
private static final ArrayList<String> STORE_TYPE_LIST = new ArrayList<>();
static {
STORE_TYPE_LIST.add("database-store");
STORE_TYPE_LIST.add("file-store");
}
private void parseStoreConfiguration(final Element e, final Configuration mainConfig) {
for (String storeType : STORE_TYPE_LIST) {
NodeList storeNodeList = e.getElementsByTagName(storeType);
if (storeNodeList.getLength() > 0) {
Element storeNode = (Element) storeNodeList.item(0);
if (storeNode.getTagName().equals("database-store")) {
mainConfig.setStoreConfiguration(createDatabaseStoreConfig(storeNode));
}
else if (storeNode.getTagName().equals("file-store")) {
mainConfig.setStoreConfiguration(createFileStoreConfig(storeNode));
}
}
}
}
private void parseHAPolicyConfiguration(final Element e, final Configuration mainConfig) {
for (String haType : HA_LIST) {
NodeList haNodeList = e.getElementsByTagName(haType);
@ -1105,6 +1135,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
return null;
}
private DatabaseStorageConfiguration createDatabaseStoreConfig(Element storeNode) {
NodeList databaseStoreNode = storeNode.getElementsByTagName("database-store");
DatabaseStorageConfiguration conf = new DatabaseStorageConfiguration();
conf.setBindingsTableName(getString(storeNode, "bindings-table-name", conf.getBindingsTableName(), Validators.NO_CHECK));
conf.setMessageTableName(getString(storeNode, "message-table-name", conf.getMessageTableName(), Validators.NO_CHECK));
conf.setJdbcConnectionUrl(getString(storeNode, "jdbc-connection-url", conf.getJdbcConnectionUrl(), Validators.NO_CHECK));
return conf;
}
private FileStorageConfiguration createFileStoreConfig(Element storeNode) {
return new FileStorageConfiguration();
}
private void parseBroadcastGroupConfiguration(final Element e, final Configuration mainConfig) {
String name = e.getAttribute("name");

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
public final class AckDescribe {
public RefEncoding refEncoding;
public AckDescribe(RefEncoding refEncoding) {
this.refEncoding = refEncoding;
}
@Override
public String toString() {
return "ACK;" + refEncoding;
}
}

View File

@ -26,9 +26,9 @@ public final class AddMessageRecord {
final ServerMessage message;
long scheduledDeliveryTime;
private long scheduledDeliveryTime;
int deliveryCount;
private int deliveryCount;
public ServerMessage getMessage() {
return message;
@ -41,4 +41,12 @@ public final class AddMessageRecord {
public int getDeliveryCount() {
return deliveryCount;
}
public void setScheduledDeliveryTime(long scheduledDeliveryTime) {
this.scheduledDeliveryTime = scheduledDeliveryTime;
}
public void setDeliveryCount(int deliveryCount) {
this.deliveryCount = deliveryCount;
}
}

View File

@ -28,31 +28,30 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DeliveryCountUpdateEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DuplicateIDEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.HeuristicCompletionEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.LargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountPendingImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecord;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.RefEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@ -200,15 +199,15 @@ public final class DescribeJournal {
public void checkRecordCounter(RecordInfo info) {
if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) newObjectEncoding(info);
long queueIDForCounter = encoding.queueID;
long queueIDForCounter = encoding.getQueueID();
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.value) {
out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.value);
if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.getValue()) {
out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
}
subsCounter.loadValue(info.id, encoding.value);
subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue());
if (subsCounter.getValue() < 0) {
@ -221,13 +220,13 @@ public final class DescribeJournal {
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) newObjectEncoding(info);
long queueIDForCounter = encoding.queueID;
long queueIDForCounter = encoding.getQueueID();
PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.value);
subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.processReload();
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.value);
out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue());
if (subsCounter.getValue() < 0) {
out.println(" #NegativeCounter!!!!");
}
@ -311,20 +310,20 @@ public final class DescribeJournal {
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
PageCountRecord encoding = (PageCountRecord) o;
queueIDForCounter = encoding.queueID;
queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadValue(info.id, encoding.value);
subsCounter.loadValue(info.id, encoding.getValue());
subsCounter.processReload();
}
else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
PageCountRecordInc encoding = (PageCountRecordInc) o;
queueIDForCounter = encoding.queueID;
queueIDForCounter = encoding.getQueueID();
subsCounter = lookupCounter(counters, queueIDForCounter);
subsCounter.loadInc(info.id, encoding.value);
subsCounter.loadInc(info.id, encoding.getValue());
subsCounter.processReload();
}
@ -345,8 +344,8 @@ public final class DescribeJournal {
out.println("### Prepared TX ###");
for (PreparedTransactionInfo tx : preparedTransactions) {
out.println(tx.id);
for (RecordInfo info : tx.records) {
out.println(tx.getId());
for (RecordInfo info : tx.getRecords()) {
Object o = newObjectEncoding(info);
out.println("- " + describeRecord(info, o));
if (info.getUserRecordType() == 31) {
@ -365,7 +364,7 @@ public final class DescribeJournal {
}
}
for (RecordInfo info : tx.recordsToDelete) {
for (RecordInfo info : tx.getRecordsToDelete()) {
out.println("- " + describeRecord(info) + " <marked to delete>");
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
final class DummyOperationContext implements OperationContext {
private static DummyOperationContext instance = new DummyOperationContext();
public static OperationContext getInstance() {
return DummyOperationContext.instance;
}
public void executeOnCompletion(final IOCallback runnable) {
// There are no executeOnCompletion calls while using the DummyOperationContext
// However we keep the code here for correctness
runnable.done();
}
public void replicationDone() {
}
public void replicationLineUp() {
}
public void storeLineUp() {
}
public void done() {
}
public void onError(final int errorCode, final String errorMessage) {
}
public void waitCompletion() {
}
public boolean waitCompletion(final long timeout) {
return true;
}
public void pageSyncLineUp() {
}
public void pageSyncDone() {
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.apache.activemq.artemis.utils.ExecutorFactory;
public class JDBCJournalStorageManager extends JournalStorageManager {
public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) {
super(config, executorFactory);
}
public JDBCJournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final IOCriticalErrorListener criticalErrorListener) {
super(config, executorFactory, criticalErrorListener);
}
@Override
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration();
Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName());
bindingsJournal = localBindings;
Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName());
messageJournal = localMessage;
}
@Override
public synchronized void stop(boolean ioCriticalError) throws Exception {
if (!started) {
return;
}
if (!ioCriticalError) {
performCachedLargeMessageDeletes();
// Must call close to make sure last id is persisted
if (journalLoaded && idGenerator != null)
idGenerator.persistCurrentID();
}
final CountDownLatch latch = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
latch.await(30, TimeUnit.SECONDS);
beforeStop();
((JDBCJournalImpl) bindingsJournal).stop(false);
messageJournal.stop();
singleThreadExecutor.shutdown();
journalLoaded = false;
started = false;
}
}

View File

@ -26,7 +26,8 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
public final class JournalRecordIds {
// grouping journal record type
static final byte GROUP_RECORD = 20;
public static final byte GROUP_RECORD = 20;
// BindingsImpl journal record type

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
public class LargeMessageTXFailureCallback implements TransactionFailureCallback {
private AbstractJournalStorageManager journalStorageManager;
private final Map<Long, ServerMessage> messages;
public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager,
final Map<Long, ServerMessage> messages) {
super();
this.journalStorageManager = journalStorageManager;
this.messages = messages;
}
public void failedTransaction(final long transactionID,
final List<RecordInfo> records,
final List<RecordInfo> recordsToDelete) {
for (RecordInfo record : records) {
if (record.userRecordType == ADD_LARGE_MESSAGE) {
byte[] data = record.data;
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
try {
LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(messages, buff);
serverMessage.decrementDelayDeletionCount();
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.journalError(e);
}
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
public final class TXLargeMessageConfirmationOperation extends TransactionOperationAbstract {
private AbstractJournalStorageManager journalStorageManager;
public List<Long> confirmedMessages = new LinkedList<Long>();
public TXLargeMessageConfirmationOperation(AbstractJournalStorageManager journalStorageManager) {
this.journalStorageManager = journalStorageManager;
}
@Override
public void afterRollback(Transaction tx) {
for (Long msg : confirmedMessages) {
try {
journalStorageManager.confirmPendingLargeMessage(msg);
}
catch (Throwable e) {
ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg);
}
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.utils.DataConstants;
public class CursorAckRecordEncoding implements EncodingSupport {
public CursorAckRecordEncoding(final long queueID, final PagePosition position) {
this.queueID = queueID;
this.position = position;
}
public CursorAckRecordEncoding() {
this.position = new PagePositionImpl();
}
@Override
public String toString() {
return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
}
public long queueID;
public PagePosition position;
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(position.getPageNr());
buffer.writeInt(position.getMessageNr());
}
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
long pageNR = buffer.readLong();
int messageNR = buffer.readInt();
this.position = new PagePositionImpl(pageNR, messageNR);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class DeleteEncoding implements EncodingSupport {
public byte recordType;
public long id;
public DeleteEncoding(final byte recordType, final long id) {
this.recordType = recordType;
this.id = id;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
*/
@Override
public int getEncodeSize() {
return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
*/
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeByte(recordType);
buffer.writeLong(id);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
*/
@Override
public void decode(ActiveMQBuffer buffer) {
recordType = buffer.readByte();
id = buffer.readLong();
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class DeliveryCountUpdateEncoding implements EncodingSupport {
public long queueID;
public int count;
public DeliveryCountUpdateEncoding() {
super();
}
public DeliveryCountUpdateEncoding(final long queueID, final int count) {
super();
this.queueID = queueID;
this.count = count;
}
public void decode(final ActiveMQBuffer buffer) {
queueID = buffer.readLong();
count = buffer.readInt();
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeInt(count);
}
public int getEncodeSize() {
return 8 + 4;
}
@Override
public String toString() {
return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.nio.ByteBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.UUID;
public class DuplicateIDEncoding implements EncodingSupport {
public SimpleString address;
public byte[] duplID;
public DuplicateIDEncoding(final SimpleString address, final byte[] duplID) {
this.address = address;
this.duplID = duplID;
}
public DuplicateIDEncoding() {
}
public void decode(final ActiveMQBuffer buffer) {
address = buffer.readSimpleString();
int size = buffer.readInt();
duplID = new byte[size];
buffer.readBytes(duplID);
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);
buffer.writeInt(duplID.length);
buffer.writeBytes(duplID);
}
public int getEncodeSize() {
return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
}
@Override
public String toString() {
// this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
// and this may be useful to validate the journal on those tests
// You may uncomment these two lines on that case and replcate the toString for the PrintData
// SimpleString simpleStr = new SimpleString(duplID);
// return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
String bridgeRepresentation = null;
// The bridge will generate IDs on these terms:
// This will make them easier to read
if (address.toString().startsWith("BRIDGE") && duplID.length == 24) {
try {
ByteBuffer buff = ByteBuffer.wrap(duplID);
// 16 for UUID
byte[] bytesUUID = new byte[16];
buff.get(bytesUUID);
UUID uuid = new UUID(UUID.TYPE_TIME_BASED, bytesUUID);
long id = buff.getLong();
bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id;
}
catch (Throwable ignored) {
bridgeRepresentation = null;
}
}
if (bridgeRepresentation != null) {
return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " +
bridgeRepresentation + "]";
}
else {
return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
/**
* This is only used when loading a transaction.
* <p>
* it might be possible to merge the functionality of this class with
* {@link FinishPageMessageOperation}
*/
// TODO: merge this class with the one on the PagingStoreImpl
public class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation {
@Override
public void afterCommit(final Transaction tx) {
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (pageTransaction != null) {
pageTransaction.commit();
}
}
@Override
public void afterRollback(final Transaction tx) {
PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
if (tx.getState() == Transaction.State.PREPARED && pageTransaction != null) {
pageTransaction.rollback();
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
public class GroupingEncoding implements EncodingSupport, GroupingInfo {
public long id;
public SimpleString groupId;
public SimpleString clusterName;
public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName) {
this.id = id;
this.groupId = groupId;
this.clusterName = clusterName;
}
public GroupingEncoding() {
}
public int getEncodeSize() {
return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(groupId);
buffer.writeSimpleString(clusterName);
}
public void decode(final ActiveMQBuffer buffer) {
groupId = buffer.readSimpleString();
clusterName = buffer.readSimpleString();
}
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
public SimpleString getGroupId() {
return groupId;
}
public SimpleString getClusterName() {
return clusterName;
}
@Override
public String toString() {
return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.XidCodecSupport;
public class HeuristicCompletionEncoding implements EncodingSupport {
public Xid xid;
public boolean isCommit;
@Override
public String toString() {
return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
}
public HeuristicCompletionEncoding(final Xid xid, final boolean isCommit) {
this.xid = xid;
this.isCommit = isCommit;
}
public HeuristicCompletionEncoding() {
}
public void decode(final ActiveMQBuffer buffer) {
xid = XidCodecSupport.decodeXid(buffer);
isCommit = buffer.readBoolean();
}
public void encode(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
buffer.writeBoolean(isCommit);
}
public int getEncodeSize() {
return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
public class LargeMessageEncoding implements EncodingSupport {
public final LargeServerMessage message;
public LargeMessageEncoding(final LargeServerMessage message) {
this.message = message;
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void decode(final ActiveMQBuffer buffer) {
message.decodeHeadersAndProperties(buffer);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void encode(final ActiveMQBuffer buffer) {
message.encode(buffer);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
*/
public int getEncodeSize() {
return message.getEncodeSize();
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountPendingImpl implements EncodingSupport, PageCountPending {
@Override
public String toString() {
return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]";
}
public PageCountPendingImpl() {
}
public PageCountPendingImpl(long queueID, long pageID, int inc) {
this.queueID = queueID;
this.pageID = pageID;
}
long id;
long queueID;
long pageID;
public void setID(long id) {
this.id = id;
}
public long getID() {
return id;
}
public long getQueueID() {
return queueID;
}
public long getPageID() {
return pageID;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG * 2;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(pageID);
}
@Override
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
pageID = buffer.readLong();
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountRecord implements EncodingSupport {
private long queueID;
private long value;
@Override
public String toString() {
return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
}
public PageCountRecord() {
}
public PageCountRecord(long queueID, long value) {
this.queueID = queueID;
this.value = value;
}
public long getQueueID() {
return queueID;
}
public long getValue() {
return value;
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG * 2;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeLong(value);
}
@Override
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
value = buffer.readLong();
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageCountRecordInc implements EncodingSupport {
private long queueID;
private int value;
@Override
public String toString() {
return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
}
public PageCountRecordInc() {
}
public PageCountRecordInc(long queueID, int value) {
this.queueID = queueID;
this.value = value;
}
public long getQueueID() {
return queueID;
}
public int getValue() {
return value;
}
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
buffer.writeInt(value);
}
public void decode(ActiveMQBuffer buffer) {
queueID = buffer.readLong();
value = buffer.readInt();
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.utils.DataConstants;
public class PageUpdateTXEncoding implements EncodingSupport {
public long pageTX;
public int recods;
@Override
public String toString() {
return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
}
public PageUpdateTXEncoding() {
}
public PageUpdateTXEncoding(final long pageTX, final int records) {
this.pageTX = pageTX;
this.recods = records;
}
public void decode(ActiveMQBuffer buffer) {
this.pageTX = buffer.readLong();
this.recods = buffer.readInt();
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(pageTX);
buffer.writeInt(recods);
}
@Override
public int getEncodeSize() {
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
public List<MessageReference> getRelatedMessageReferences() {
return null;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.DataConstants;
public class PendingLargeMessageEncoding implements EncodingSupport {
public long largeMessageID;
public PendingLargeMessageEncoding(final long pendingLargeMessageID) {
this.largeMessageID = pendingLargeMessageID;
}
public PendingLargeMessageEncoding() {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void decode(final ActiveMQBuffer buffer) {
largeMessageID = buffer.readLong();
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
*/
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(largeMessageID);
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
*/
public int getEncodeSize() {
return DataConstants.SIZE_LONG;
}
@Override
public String toString() {
return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.utils.DataConstants;
public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {
public long id;
public SimpleString name;
public SimpleString address;
public SimpleString filterString;
public boolean autoCreated;
public SimpleString user;
public PersistentQueueBindingEncoding() {
}
@Override
public String toString() {
return "PersistentQueueBindingEncoding [id=" + id +
", name=" +
name +
", address=" +
address +
", filterString=" +
filterString +
", user=" +
user +
", autoCreated=" +
autoCreated +
"]";
}
public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address,
final SimpleString filterString,
final SimpleString user,
final boolean autoCreated) {
this.name = name;
this.address = address;
this.filterString = filterString;
this.user = user;
this.autoCreated = autoCreated;
}
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
public SimpleString getAddress() {
return address;
}
public void replaceQueueName(SimpleString newName) {
this.name = newName;
}
public SimpleString getFilterString() {
return filterString;
}
public SimpleString getQueueName() {
return name;
}
public SimpleString getUser() {
return user;
}
public boolean isAutoCreated() {
return autoCreated;
}
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
address = buffer.readSimpleString();
filterString = buffer.readNullableSimpleString();
String metadata = buffer.readNullableSimpleString().toString();
if (metadata != null) {
String[] elements = metadata.split(";");
for (String element : elements) {
String[] keyValuePair = element.split("=");
if (keyValuePair.length == 2) {
if (keyValuePair[0].equals("user")) {
user = SimpleString.toSimpleString(keyValuePair[1]);
}
}
}
}
autoCreated = buffer.readBoolean();
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name);
buffer.writeSimpleString(address);
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(createMetadata());
buffer.writeBoolean(autoCreated);
}
public int getEncodeSize() {
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
SimpleString.sizeofNullableString(createMetadata());
}
private SimpleString createMetadata() {
StringBuilder metadata = new StringBuilder();
metadata.append("user=").append(user).append(";");
return SimpleString.toSimpleString(metadata.toString());
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class QueueEncoding implements EncodingSupport {
public long queueID;
public QueueEncoding(final long queueID) {
super();
this.queueID = queueID;
}
public QueueEncoding() {
super();
}
public void decode(final ActiveMQBuffer buffer) {
queueID = buffer.readLong();
}
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(queueID);
}
public int getEncodeSize() {
return 8;
}
@Override
public String toString() {
return "QueueEncoding [queueID=" + queueID + "]";
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
public class RefEncoding extends QueueEncoding {
public RefEncoding() {
super();
}
public RefEncoding(final long queueID) {
super(queueID);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
public class ScheduledDeliveryEncoding extends QueueEncoding {
public long scheduledDeliveryTime;
@Override
public String toString() {
return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]";
}
public ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {
super(queueID);
this.scheduledDeliveryTime = scheduledDeliveryTime;
}
public ScheduledDeliveryEncoding() {
}
@Override
public int getEncodeSize() {
return super.getEncodeSize() + 8;
}
@Override
public void encode(final ActiveMQBuffer buffer) {
super.encode(buffer);
buffer.writeLong(scheduledDeliveryTime);
}
@Override
public void decode(final ActiveMQBuffer buffer) {
super.decode(buffer);
scheduledDeliveryTime = buffer.readLong();
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.XidCodecSupport;
/**
* It's public as other classes may want to unparse data on tools
*/
public class XidEncoding implements EncodingSupport {
public final Xid xid;
public XidEncoding(final Xid xid) {
this.xid = xid;
}
public XidEncoding(final byte[] data) {
xid = XidCodecSupport.decodeXid(ActiveMQBuffers.wrappedBuffer(data));
}
public void decode(final ActiveMQBuffer buffer) {
throw new IllegalStateException("Non Supported Operation");
}
public void encode(final ActiveMQBuffer buffer) {
XidCodecSupport.encodeXid(xid, buffer);
}
public int getEncodeSize() {
return XidCodecSupport.getXidEncodeLength(xid);
}
}

View File

@ -22,7 +22,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
/**
@ -40,8 +40,8 @@ public class ReplicationStartSyncMessage extends PacketImpl {
private boolean allowsAutoFailBack;
public enum SyncDataType {
JournalBindings(JournalContent.BINDINGS.typeByte),
JournalMessages(JournalContent.MESSAGES.typeByte),
JournalBindings(AbstractJournalStorageManager.JournalContent.BINDINGS.typeByte),
JournalMessages(AbstractJournalStorageManager.JournalContent.MESSAGES.typeByte),
LargeMessages((byte) 2);
private byte code;
@ -50,8 +50,8 @@ public class ReplicationStartSyncMessage extends PacketImpl {
this.code = code;
}
public static JournalContent getJournalContentType(SyncDataType dataType) {
return JournalContent.getType(dataType.code);
public static AbstractJournalStorageManager.JournalContent getJournalContentType(SyncDataType dataType) {
return AbstractJournalStorageManager.JournalContent.getType(dataType.code);
}
public static SyncDataType getDataType(byte code) {
@ -86,7 +86,7 @@ public class ReplicationStartSyncMessage extends PacketImpl {
}
public ReplicationStartSyncMessage(JournalFile[] datafiles,
JournalContent contentType,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) {
this();

View File

@ -23,7 +23,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
/**
@ -35,7 +35,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
/**
* The JournalType or {@code null} if sync'ing large-messages.
*/
private JournalContent journalType;
private AbstractJournalStorageManager.JournalContent journalType;
/**
* This value refers to {@link org.apache.activemq.artemis.core.journal.impl.JournalFile#getFileID()}, or the
* message id if we are sync'ing a large-message.
@ -74,7 +74,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
super(REPLICATION_SYNC_FILE);
}
public ReplicationSyncFileMessage(JournalContent content,
public ReplicationSyncFileMessage(AbstractJournalStorageManager.JournalContent content,
SimpleString storeName,
long id,
int size,
@ -135,7 +135,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
fileId = buffer.readLong();
switch (FileType.getFileType(buffer.readByte())) {
case JOURNAL: {
journalType = JournalContent.getType(buffer.readByte());
journalType = AbstractJournalStorageManager.JournalContent.getType(buffer.readByte());
fileType = FileType.JOURNAL;
break;
}
@ -160,7 +160,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
return fileId;
}
public JournalContent getJournalContent() {
public AbstractJournalStorageManager.JournalContent getJournalContent() {
return journalType;
}

View File

@ -47,7 +47,7 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageInSync;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;

View File

@ -32,12 +32,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@ -437,7 +437,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* @throws ActiveMQException
* @throws Exception
*/
public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception {
public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception {
if (!enabled) {
return;
}
@ -473,7 +473,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
private void sendLargeFile(JournalContent content,
private void sendLargeFile(AbstractJournalStorageManager.JournalContent content,
SimpleString pageStore,
final long id,
SequentialFile file,
@ -536,7 +536,7 @@ public final class ReplicationManager implements ActiveMQComponent {
* @throws ActiveMQException
*/
public void sendStartSyncMessage(JournalFile[] datafiles,
JournalContent contentType,
AbstractJournalStorageManager.JournalContent contentType,
String nodeID,
boolean allowsAutoFailBack) throws ActiveMQException {
if (enabled)

View File

@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@ -70,6 +71,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
@ -1479,7 +1481,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*/
private StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
return new JDBCJournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
}
// Default to File Based Storage Manager, (Legacy default configuration).
else {
return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
}
}
return new NullStorageManager();
}

View File

@ -683,6 +683,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="store" type="storeType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The Store Type used by the server
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="security-settings" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -1437,6 +1445,55 @@
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="storeType">
<xsd:choice>
<xsd:element name="file-store" type="fileStoreType" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
Use a file based store for peristing journal, paging and large messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="database-store" type="databaseStoreType" minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
Use a database for persisting journal, paging and large messages
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:choice>
</xsd:complexType>
<xsd:complexType name="fileStoreType">
</xsd:complexType>
<xsd:complexType name="databaseStoreType">
<xsd:all>
<xsd:element name="jdbc-connection-url" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The JDBC Connection URL e.g. jdbc:mysql://localhost:3306/
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="message-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to store message journal entries
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="bindings-table-name" type="xsd:string" minOccurs="1" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
The table name used to store bindings journal entries
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
</xsd:complexType>
<xsd:complexType name="haPolicyType">
<xsd:choice>
<xsd:element name="live-only" type="haLiveOnlyPolicyType" minOccurs="0" maxOccurs="1">
@ -1489,6 +1546,7 @@
</xsd:choice>
</xsd:complexType>
<xsd:complexType name="haColocationReplicationType">
<xsd:all>
<xsd:element name="request-backup" type="xsd:boolean" minOccurs="0" maxOccurs="1" default="false">

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.impl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
public class DatabaseStoreConfigurationTest extends ActiveMQTestBase {
@Test
public void databaseStoreConfigTest() throws Exception {
Configuration configuration = createConfiguration("database-store-config.xml");
ActiveMQServerImpl server = new ActiveMQServerImpl(configuration);
assertEquals(StoreConfiguration.StoreType.DATABASE, server.getConfiguration().getStoreConfiguration().getStoreType());
}
protected Configuration createConfiguration(String fileName) throws Exception {
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(fileName);
deploymentManager.addDeployable(fc);
deploymentManager.readConfiguration();
// we need this otherwise the data folder will be located under activemq-server and not on the temporary directory
fc.setPagingDirectory(getTestDir() + "/" + fc.getPagingDirectory());
fc.setLargeMessagesDirectory(getTestDir() + "/" + fc.getLargeMessagesDirectory());
fc.setJournalDirectory(getTestDir() + "/" + fc.getJournalDirectory());
fc.setBindingsDirectory(getTestDir() + "/" + fc.getBindingsDirectory());
return fc;
}
}

View File

@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -387,6 +388,19 @@ public abstract class ActiveMQTestBase extends Assert {
return createDefaultConfig(0, netty);
}
protected Configuration createDefaultJDBCConfig() throws Exception {
Configuration configuration = createDefaultConfig(true);
DatabaseStorageConfiguration dbStorageConfiguration = new DatabaseStorageConfiguration();
dbStorageConfiguration.setJdbcConnectionUrl(getTestJDBCConnectionUrl());
dbStorageConfiguration.setBindingsTableName("BINDINGS");
dbStorageConfiguration.setMessageTableName("MESSAGES");
configuration.setStoreConfiguration(dbStorageConfiguration);
return configuration;
}
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
ConfigurationImpl configuration = createBasicConfig(serverID).setJMXManagementEnabled(false).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, generateInVMParams(serverID)));
@ -749,6 +763,10 @@ public abstract class ActiveMQTestBase extends Assert {
return testDir;
}
protected final String getTestJDBCConnectionUrl() {
return "jdbc:derby:" + getTestDir() + File.separator + "derby;create=true";
}
protected final File getTestDirfile() {
return new File(testDir);
}

View File

@ -0,0 +1,30 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration
xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq ../../main/resources/schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<store>
<database-store>
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
</database-store>
</store>
</core>
</configuration>

View File

@ -3,12 +3,14 @@
In this chapter we will describe how persistence works with Apache ActiveMQ Artemis and
how to configure it.
Apache ActiveMQ Artemis ships with a high performance journal. Since Apache ActiveMQ Artemis handles
its own persistence, rather than relying on a database or other 3rd
party persistence engine it is very highly optimised for the specific
messaging use cases.
Apache ActiveMQ Artemis ships with two persistence options. The Apache ActiveMQ Artemis File journal
which is highly optimized for the messaging use case and gives great performance, and also Apache Artemis
JDBC Store, which uses JDBC to connect to a database of your choice. The JDBC Store is still under development,
but it is possible to use it's journal features, (essentially everything except for paging and large messages).
An Apache ActiveMQ Artemis journal is an *append only* journal. It consists of a set of
## Apache ActiveMQ Artemis File Journal (Default)
An Apache ActiveMQ Artemis file journal is an *append only* journal. It consists of a set of
files on disk. Each file is pre-created to a fixed size and initially
filled with padding. As operations are performed on the server, e.g. add
message, update message, delete message, records are appended to the
@ -126,7 +128,7 @@ If no persistence is required at all, Apache ActiveMQ Artemis can also be config
not to persist any data at all to storage as discussed in the Configuring
the broker for Zero Persistence section.
## Configuring the bindings journal
### Configuring the bindings journal
The bindings journal is configured using the following attributes in
`broker.xml`
@ -143,11 +145,11 @@ The bindings journal is configured using the following attributes in
`bindings-directory` if it does not already exist. The default value
is `true`
## Configuring the jms journal
### Configuring the jms journal
The jms config shares its configuration with the bindings journal.
## Configuring the message journal
### Configuring the message journal
The message journal is configured using the following attributes in
`broker.xml`
@ -297,7 +299,7 @@ The message journal is configured using the following attributes in
The default for this parameter is `30`
## An important note on disabling disk write cache.
### An important note on disabling disk write cache.
> **Warning**
>
@ -336,7 +338,7 @@ The message journal is configured using the following attributes in
> On Windows you can check / change the setting by right clicking on the
> disk and clicking properties.
## Installing AIO
### Installing AIO
The Java NIO journal gives great performance, but If you are running
Apache ActiveMQ Artemis using Linux Kernel 2.6 or later, we highly recommend you use
@ -356,6 +358,40 @@ Using aptitude, (e.g. on Ubuntu or Debian system):
apt-get install libaio
## Apache ActiveMQ Artemis JDBC Persistence
The Apache ActiveMQ Artemis JDBC persistence store is still under development and only supports persistence of standard messages and bindings (this is everything except large messages and paging). The JDBC store uses a JDBC connection to store messages and bindings data in records in database tables. The data stored in the database tables is encoded using Apache ActiveMQ Artemis journal encoding.
### Configuring JDBC Persistence
To configure Apache ActiveMQ Artemis to use a database for persisting messages and bindings data you must do two things.
1. Add the appropriate JDBC client libraries to the Artemis runtime. You can do this by dropping the relevant jars in the lib folder of the ActiveMQ Artemis distribution.
2. create a store element in your broker.xml config file under the <core> element. For example:
```xml
<store>
<database-store>
<jdbc-connection-url>jdbc:derby:target/derby/database-store;create=true</jdbc-connection-url>
<bindings-table-name>BINDINGS_TABLE</bindings-table-name>
<message-table-name>MESSAGE_TABLE</message-table-name>
</database-store>
</store>
```
- `jdbc-connection-url`
The full JDBC connection URL for your database server. The connection url should include all configuration parameters and database name.
- `bindings-table-name`
The name of the table in which bindings data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
- `message-table-name`
The name of the table in which messages and related data will be persisted for the ActiveMQ Artemis server. Specifying table names allows users to share single database amongst multiple servers, without interference.
## Configuring Apache ActiveMQ Artemis for Zero Persistence
In some situations, zero persistence is sometimes required for a
@ -366,3 +402,5 @@ straightforward. Simply set the parameter `persistence-enabled` in
Please note that if you set this parameter to false, then *zero*
persistence will occur. That means no bindings data, message data, large
message data, duplicate id caches or paging data will be persisted.

13
pom.xml
View File

@ -47,6 +47,7 @@
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-jdbc-store</module>
<module>artemis-maven-plugin</module>
<module>artemis-server-osgi</module>
<module>integration/activemq-spring-integration</module>
@ -82,6 +83,7 @@
<resteasy.version>3.0.13.Final</resteasy.version>
<proton.version>0.10</proton.version>
<fuse.mqtt.client.version>1.10</fuse.mqtt.client.version>
<apache.derby.version>10.11.1.1</apache.derby.version>
<skipUnitTests>true</skipUnitTests>
<skipJmsTests>true</skipJmsTests>
<skipExtraTests>true</skipExtraTests>
@ -202,6 +204,11 @@
<version>${fuse.mqtt.client.version}</version>
<!-- Apache v2.0 License -->
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${apache.derby.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>mqtt-client</artifactId>
@ -588,10 +595,12 @@
<module>artemis-native</module>
<module>artemis-protocols</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
<module>artemis-maven-plugin</module>
<module>artemis-jdbc-store</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
@ -623,6 +632,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
@ -681,6 +691,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
@ -723,6 +734,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>
@ -757,6 +769,7 @@
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
<module>artemis-service-extensions</module>

View File

@ -117,6 +117,11 @@
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-amqp-protocol</artifactId>

View File

@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
public class FakeEncodingSupportImpl implements EncodingSupport {
private byte[] data;
public FakeEncodingSupportImpl(byte[] data) {
this.data = data;
}
@Override
public int getEncodeSize() {
return data.length;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeBytes(data);
}
@Override
public void decode(ActiveMQBuffer buffer) {
data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class JDBCJournalTest {
private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL";
private JDBCJournalImpl journal;
private String jdbcUrl;
private Properties jdbcConnectionProperties;
@Before
public void setup() throws Exception {
jdbcUrl = "jdbc:derby:target/data;create=true";
journal = new JDBCJournalImpl(jdbcUrl, JOURNAL_TABLE_NAME);
journal.start();
}
@Test
public void testInsertRecords() throws Exception {
int noRecords = 10;
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(1, (byte) 1, new byte[0], true);
}
Thread.sleep(3000);
assertEquals(noRecords, journal.getNumberOfRecords());
}
@Test
public void testCallbacks() throws Exception {
final int noRecords = 10;
final CountDownLatch done = new CountDownLatch(noRecords);
IOCompletion completion = new IOCompletion() {
@Override
public void storeLineUp() {
}
@Override
public void done() {
done.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(1, (byte) 1, new FakeEncodingSupportImpl(new byte[0]), true, completion);
}
journal.sync();
done.await(5, TimeUnit.SECONDS);
assertEquals(done.getCount(), 0);
}
@Test
public void testReadJournal() throws Exception {
int noRecords = 100;
// Standard Add Records
for (int i = 0; i < noRecords; i++) {
journal.appendAddRecord(i, (byte) i, new byte[i], true);
}
// TX Records
int noTx = 10;
int noTxRecords = 100;
for (int i = 1000; i < 1000 + noTx; i++) {
for (int j = 0; j < noTxRecords; j++) {
journal.appendAddRecordTransactional(i, Long.valueOf(i + "" + j), (byte) 1, new byte[0]);
}
journal.appendPrepareRecord(i, new byte[0], true);
journal.appendCommitRecord(i, true);
}
Thread.sleep(2000);
List<RecordInfo> recordInfos = new ArrayList<>();
List<PreparedTransactionInfo> txInfos = new ArrayList<>();
journal.load(recordInfos, txInfos, null);
assertEquals(noRecords + (noTxRecords * noTx), recordInfos.size());
}
@After
public void tearDown() throws Exception {
journal.destroy();
}
}

View File

@ -16,22 +16,30 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AddressSettingsConfigurationStorageTest extends StorageManagerTestBase {
private Map<SimpleString, PersistedAddressSetting> mapExpectedAddresses;
public AddressSettingsConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {
@ -40,7 +48,7 @@ public class AddressSettingsConfigurationStorageTest extends StorageManagerTestB
mapExpectedAddresses = new HashMap<>();
}
protected void addAddress(JournalStorageManager journal1, String address, AddressSettings setting) throws Exception {
protected void addAddress(StorageManager journal1, String address, AddressSettings setting) throws Exception {
SimpleString str = new SimpleString(address);
PersistedAddressSetting persistedSetting = new PersistedAddressSetting(str, setting);
mapExpectedAddresses.put(str, persistedSetting);
@ -84,7 +92,7 @@ public class AddressSettingsConfigurationStorageTest extends StorageManagerTestB
* @param journal1
* @throws Exception
*/
private void checkAddresses(JournalStorageManager journal1) throws Exception {
private void checkAddresses(StorageManager journal1) throws Exception {
List<PersistedAddressSetting> listSetting = journal1.recoverAddressSettings();
assertEquals(mapExpectedAddresses.size(), listSetting.size());

View File

@ -17,12 +17,13 @@
package org.apache.activemq.artemis.tests.integration.persistence;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@ -30,8 +31,11 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.PostOfficeJournalLoader;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;
public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
@ -39,6 +43,17 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase {
ArrayList<Long> deletedMessage = new ArrayList<>();
public DeleteMessagesOnStartupTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
// This is only applicable for FILE based store, as the database storage manager will automatically delete records.
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}};
return Arrays.asList(params);
}
@Test
public void testDeleteMessagesOnStartup() throws Exception {
createStorage();

View File

@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@ -32,6 +33,10 @@ import org.junit.Test;
public class DuplicateCacheTest extends StorageManagerTestBase {
public DuplicateCacheTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@After
@Override
public void tearDown() throws Exception {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.jms.persistence.config.PersistedConnectionFactory;
import org.apache.activemq.artemis.jms.server.config.ConnectionFactoryConfiguration;
@ -36,6 +37,10 @@ public class JMSConnectionFactoryConfigurationStorageTest extends StorageManager
private Map<String, PersistedConnectionFactory> mapExpectedCFs;
public JMSConnectionFactoryConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.junit.Assert;
import org.junit.Test;
@ -27,6 +28,10 @@ import org.apache.activemq.artemis.jms.persistence.config.PersistedType;
public class JMSStorageManagerTest extends StorageManagerTestBase {
public JMSStorageManagerTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
//https://issues.jboss.org/browse/HORNETQ-812
@Test
public void testJNDIPersistence() throws Exception {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.junit.Before;
import org.junit.Test;
@ -31,6 +32,10 @@ public class RolesConfigurationStorageTest extends StorageManagerTestBase {
private Map<SimpleString, PersistedRoles> mapExpectedSets;
public RolesConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
super(storeType);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -16,9 +16,19 @@
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JDBCJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.jms.persistence.JMSStorageManager;
import org.apache.activemq.artemis.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
@ -29,23 +39,39 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.TimeAndCounterIDGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
@RunWith(Parameterized.class)
public abstract class StorageManagerTestBase extends ActiveMQTestBase {
protected ExecutorService executor;
protected ExecutorFactory execFactory;
protected JournalStorageManager journal;
protected StorageManager journal;
protected JMSStorageManager jmsJournal;
protected StoreConfiguration.StoreType storeType;
public StorageManagerTestBase(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType")
public static Collection<Object[]> data() {
Object[][] params = new Object[][] {{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
if (storeType == StoreConfiguration.StoreType.DATABASE) {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
}
super.setUp();
execFactory = getOrderedExecutor();
@ -79,6 +105,15 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal = null;
}
// Stops the database engine early to stop thread leaks showing.
if (storeType == StoreConfiguration.StoreType.DATABASE) {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (SQLException e) {
}
}
super.tearDown();
if (exception != null)
throw exception;
@ -88,7 +123,13 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
* @throws Exception
*/
protected void createStorage() throws Exception {
journal = createJournalStorageManager(createDefaultInVMConfig());
if (storeType == StoreConfiguration.StoreType.DATABASE) {
journal = createJDBCJournalStorageManager(createDefaultJDBCConfig());
}
else {
journal = createJournalStorageManager(createDefaultInVMConfig());
}
journal.start();
@ -106,6 +147,15 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
return jsm;
}
/**
* @param configuration
*/
protected JDBCJournalStorageManager createJDBCJournalStorageManager(Configuration configuration) {
JDBCJournalStorageManager jsm = new JDBCJournalStorageManager(configuration, execFactory, null);
addActiveMQComponent(jsm);
return jsm;
}
/**
* @throws Exception
*/
@ -115,4 +165,5 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase {
jmsJournal.start();
jmsJournal.load();
}
}

View File

@ -839,10 +839,10 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
setupAndLoadJournal(JOURNAL_SIZE, 1);
Assert.assertEquals(1, transactions.size());
Assert.assertEquals(1, transactions.get(0).recordsToDelete.size());
Assert.assertEquals(1, transactions.get(0).getRecordsToDelete().size());
Assert.assertEquals(1, records.size());
for (RecordInfo record : transactions.get(0).recordsToDelete) {
for (RecordInfo record : transactions.get(0).getRecordsToDelete()) {
byte[] data = record.data;
Assert.assertEquals(100, data.length);
for (byte element : data) {
@ -850,10 +850,10 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
}
}
Assert.assertEquals(10, transactions.get(0).extraData.length);
Assert.assertEquals(10, transactions.get(0).getExtraData().length);
for (int i = 0; i < 10; i++) {
Assert.assertEquals((byte) 1, transactions.get(0).extraData[i]);
Assert.assertEquals((byte) 1, transactions.get(0).getExtraData()[i]);
}
journalImpl.appendCommitRecord(1L, false);
@ -894,9 +894,9 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, records.size());
Assert.assertEquals(1, transactions.size());
Assert.assertEquals(10, transactions.get(0).extraData.length);
Assert.assertEquals(10, transactions.get(0).getExtraData().length);
for (int i = 0; i < 10; i++) {
Assert.assertEquals((byte) 1, transactions.get(0).extraData[i]);
Assert.assertEquals((byte) 1, transactions.get(0).getExtraData()[i]);
}
journalImpl.checkReclaimStatus();
@ -925,9 +925,9 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(1, transactions.size());
Assert.assertEquals(15, transactions.get(0).extraData.length);
Assert.assertEquals(15, transactions.get(0).getExtraData().length);
for (byte element : transactions.get(0).extraData) {
for (byte element : transactions.get(0).getExtraData()) {
Assert.assertEquals(2, element);
}

View File

@ -265,9 +265,9 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
if (entry.getValue().prepared) {
PreparedTransactionInfo info = new PreparedTransactionInfo(entry.getKey(), null);
info.records.addAll(entry.getValue().records);
info.getRecords().addAll(entry.getValue().records);
info.recordsToDelete.addAll(entry.getValue().deletes);
info.getRecordsToDelete().addAll(entry.getValue().deletes);
prepared.add(info);
}
@ -465,15 +465,15 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
PreparedTransactionInfo ractual = iterActual.next();
Assert.assertEquals("ids not same", rexpected.id, ractual.id);
Assert.assertEquals("ids not same", rexpected.getId(), ractual.getId());
checkRecordsEquivalent(rexpected.records, ractual.records);
checkRecordsEquivalent(rexpected.getRecords(), ractual.getRecords());
Assert.assertEquals("deletes size not same", rexpected.recordsToDelete.size(), ractual.recordsToDelete.size());
Assert.assertEquals("deletes size not same", rexpected.getRecordsToDelete().size(), ractual.getRecordsToDelete().size());
Iterator<RecordInfo> iterDeletesExpected = rexpected.recordsToDelete.iterator();
Iterator<RecordInfo> iterDeletesExpected = rexpected.getRecordsToDelete().iterator();
Iterator<RecordInfo> iterDeletesActual = ractual.recordsToDelete.iterator();
Iterator<RecordInfo> iterDeletesActual = ractual.getRecordsToDelete().iterator();
while (iterDeletesExpected.hasNext()) {
long lexpected = iterDeletesExpected.next().id;