ARTEMIS-27 / ARTEMIS-339 Added JDBC Journal Support

This commit is contained in:
Martyn Taylor 2016-01-06 14:16:44 +00:00 committed by Clebert Suconic
parent 9b351d8236
commit 9dd9c021a0
9 changed files with 1323 additions and 0 deletions

View File

@ -132,6 +132,11 @@
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jdbc-store</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-website</artifactId>

View File

@ -54,6 +54,7 @@
<include>org.apache.activemq:artemis-jms-client</include>
<include>org.apache.activemq:artemis-jms-server</include>
<include>org.apache.activemq:artemis-journal</include>
<include>org.apache.activemq:artemis-jdbc-store</include>
<include>org.apache.activemq:artemis-native</include>
<include>org.apache.activemq:artemis-amqp-protocol</include>
<include>org.apache.activemq:artemis-openwire-protocol</include>
@ -93,6 +94,7 @@
<include>org.fusesource.hawtbuf:hawtbuf</include>
<include>org.jgroups:jgroups</include>
<include>io.netty:netty-codec-mqtt</include>
<include>org.apache.derby:derby</include>
</includes>
<!--excludes>
<exclude>org.apache.activemq:artemis-website</exclude>

View File

@ -0,0 +1,74 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>1.2.1-SNAPSHOT</version>
</parent>
<artifactId>artemis-jdbc-store</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JDBC Store</name>
<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,607 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.derby.jdbc.AutoloadedDriver;
public class JDBCJournalImpl implements Journal {
// Sync Delay in ms
public static final int SYNC_DELAY = 500;
private static int USER_VERSION = 1;
private final String tableName;
private Connection connection;
private List<JDBCJournalRecord> records;
private PreparedStatement insertJournalRecords;
private PreparedStatement selectJournalRecords;
private PreparedStatement countJournalRecords;
private PreparedStatement deleteJournalRecords;
private PreparedStatement deleteTxJournalRecords;
private boolean started;
private String jdbcUrl;
private Timer syncTimer;
private Driver dbDriver;
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
private boolean isDerby = false;
public JDBCJournalImpl(String jdbcUrl, String tableName) {
this.tableName = tableName;
this.jdbcUrl = jdbcUrl;
records = new ArrayList<JDBCJournalRecord>();
}
@Override
public void start() throws Exception {
// Load Database driver, sets Derby Autoloaded Driver as lowest priority.
List<Driver> drivers = Collections.list(DriverManager.getDrivers());
if (drivers.size() <= 2 && drivers.size() > 0) {
dbDriver = drivers.get(0);
isDerby = dbDriver instanceof AutoloadedDriver;
if (drivers.size() > 1 && isDerby) {
dbDriver = drivers.get(1);
}
if (isDerby) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
}
catch (Exception e) {
}
}
});
}
}
else {
String error = drivers.isEmpty() ? "No DB driver found on class path" : "Too many DB drivers on class path, not sure which to use";
throw new RuntimeException(error);
}
connection = dbDriver.connect(jdbcUrl, new Properties());
// If JOURNAL table doesn't exist then create it
ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null);
if (!rs.next()) {
Statement statement = connection.createStatement();
statement.executeUpdate(JDBCJournalRecord.createTableSQL(tableName));
}
insertJournalRecords = connection.prepareStatement(JDBCJournalRecord.insertRecordsSQL(tableName));
selectJournalRecords = connection.prepareStatement(JDBCJournalRecord.selectRecordsSQL(tableName));
countJournalRecords = connection.prepareStatement("SELECT COUNT(*) FROM " + tableName);
deleteJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteRecordsSQL(tableName));
deleteTxJournalRecords = connection.prepareStatement(JDBCJournalRecord.deleteTxRecordsSQL(tableName));
syncTimer = new Timer();
syncTimer.scheduleAtFixedRate(new JDBCJournalSync(this), SYNC_DELAY * 2, SYNC_DELAY);
started = true;
}
@Override
public void stop() throws Exception {
stop(true);
}
public synchronized void stop(boolean shutdownConnection) throws Exception {
if (started) {
syncTimer.cancel();
sync();
if (shutdownConnection) {
connection.close();
}
started = false;
}
}
public synchronized void destroy() throws Exception {
connection.setAutoCommit(false);
Statement statement = connection.createStatement();
statement.executeUpdate("DROP TABLE " + tableName);
connection.commit();
stop();
}
public int sync() throws SQLException {
List<JDBCJournalRecord> recordRef = records;
records = new ArrayList<JDBCJournalRecord>();
for (JDBCJournalRecord record : recordRef) {
record.storeLineUp();
switch (record.getRecordType()) {
case JDBCJournalRecord.DELETE_RECORD:
record.writeDeleteRecord(deleteJournalRecords);
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
record.writeDeleteTxRecord(deleteTxJournalRecords);
break;
default:
record.writeRecord(insertJournalRecords);
break;
}
}
boolean success = false;
try {
connection.setAutoCommit(false);
insertJournalRecords.executeBatch();
deleteJournalRecords.executeBatch();
deleteTxJournalRecords.executeBatch();
connection.commit();
success = true;
}
catch (SQLException e) {
connection.rollback();
e.printStackTrace();
}
executeCallbacks(recordRef, success);
return recordRef.size();
}
// TODO Use an executor.
private void executeCallbacks(final List<JDBCJournalRecord> records, final boolean result) {
Runnable r = new Runnable() {
@Override
public void run() {
for (JDBCJournalRecord record : records) {
record.complete(result);
}
}
};
Thread t = new Thread(r);
t.start();
}
private void appendRecord(JDBCJournalRecord record) throws SQLException {
try {
journalLock.writeLock().lock();
records.add(record);
}
finally {
journalLock.writeLock().unlock();
}
}
@Override
public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendAddRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
}
@Override
public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendUpdateRecord(long id,
byte recordType,
EncodingSupport record,
boolean sync,
IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
}
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
r.setSync(sync);
r.setIoCompletion(completionCallback);
appendRecord(r);
}
@Override
public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
r.setUserRecordType(recordType);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
r.setRecord(record);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public void appendCommitRecord(long txID,
boolean sync,
IOCompletion callback,
boolean lineUpContext) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.COMMIT_RECORD);
r.setTxId(txID);
r.setStoreLineUp(lineUpContext);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendPrepareRecord(long txID,
EncodingSupport transactionData,
boolean sync,
IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setTxData(transactionData);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendRollbackRecord(long txID, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
r.setTxId(txID);
r.setSync(sync);
appendRecord(r);
}
@Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
r.setTxId(txID);
r.setSync(sync);
r.setIoCompletion(callback);
appendRecord(r);
}
@Override
public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception {
JournalLoadInformation jli = new JournalLoadInformation();
JDBCJournalReaderCallback jrc = new JDBCJournalReaderCallback(reloadManager);
JDBCJournalRecord r;
try (ResultSet rs = selectJournalRecords.executeQuery()) {
int noRecords = 0;
while (rs.next()) {
r = JDBCJournalRecord.readRecord(rs);
switch (r.getRecordType()) {
case JDBCJournalRecord.ADD_RECORD:
jrc.onReadAddRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD:
jrc.onReadUpdateRecord(r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD:
jrc.onReadDeleteRecord(r.getId());
break;
case JDBCJournalRecord.ADD_RECORD_TX:
jrc.onReadAddRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.UPDATE_RECORD_TX:
jrc.onReadUpdateRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.DELETE_RECORD_TX:
jrc.onReadDeleteRecordTX(r.getTxId(), r.toRecordInfo());
break;
case JDBCJournalRecord.PREPARE_RECORD:
jrc.onReadPrepareRecord(r.getTxId(), r.getTxDataAsByteArray(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.COMMIT_RECORD:
jrc.onReadCommitRecord(r.getTxId(), r.getTxCheckNoRecords());
break;
case JDBCJournalRecord.ROLLBACK_RECORD:
jrc.onReadRollbackRecord(r.getTxId());
break;
default:
throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
}
noRecords++;
}
jli.setMaxID(((JDBCJournalLoaderCallback) reloadManager).getMaxId());
jli.setNumberOfRecords(noRecords);
}
return jli;
}
@Override
public JournalLoadInformation loadInternalOnly() throws Exception {
return null;
}
@Override
public JournalLoadInformation loadSyncOnly(JournalState state) throws Exception {
return null;
}
@Override
public void lineUpContext(IOCompletion callback) {
callback.storeLineUp();
}
@Override
public JournalLoadInformation load(List<RecordInfo> committedRecords,
List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure) throws Exception {
return load(committedRecords, preparedTransactions, transactionFailure, true);
}
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) throws Exception {
JDBCJournalLoaderCallback lc = new JDBCJournalLoaderCallback(committedRecords, preparedTransactions, failureCallback, fixBadTX);
return load(lc);
}
@Override
public int getAlignment() throws Exception {
return 0;
}
@Override
public int getNumberOfRecords() {
int count = 0;
try (ResultSet rs = countJournalRecords.executeQuery()) {
rs.next();
count = rs.getInt(1);
}
catch (SQLException e) {
return -1;
}
return count;
}
@Override
public int getUserVersion() {
return USER_VERSION;
}
@Override
public void perfBlast(int pages) {
}
@Override
public void runDirectJournalBlast() throws Exception {
}
@Override
public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception {
return null;
}
public final void synchronizationLock() {
journalLock.writeLock().lock();
}
public final void synchronizationUnlock() {
journalLock.writeLock().unlock();
}
@Override
public void forceMoveNextFile() throws Exception {
}
@Override
public JournalFile[] getDataFiles() {
return new JournalFile[0];
}
@Override
public SequentialFileFactory getFileFactory() {
return null;
}
@Override
public int getFileSize() {
return 0;
}
@Override
public void scheduleCompactAndBlock(int timeout) throws Exception {
}
@Override
public void replicationSyncPreserveOldFiles() {
}
@Override
public void replicationSyncFinished() {
}
@Override
public boolean isStarted() {
return started;
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
public class JDBCJournalLoaderCallback implements LoaderCallback {
private static final int DELETE_FLUSH = 20000;
private final List<PreparedTransactionInfo> preparedTransactions;
private final TransactionFailureCallback failureCallback;
private final boolean fixBadTX;
/* We keep track of list entries for each ID. This preserves order and allows multiple record insertions with the
same ID. We use this for deleting records */
private final Map<Long, List<Integer>> deleteReferences = new HashMap<Long, List<Integer>>();
private Runtime runtime = Runtime.getRuntime();
private final List<RecordInfo> committedRecords;
private long maxId = -1;
public JDBCJournalLoaderCallback(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
final boolean fixBadTX) {
this.committedRecords = committedRecords;
this.preparedTransactions = preparedTransactions;
this.failureCallback = failureCallback;
this.fixBadTX = fixBadTX;
}
public synchronized void checkMaxId(long id) {
if (maxId < id) {
maxId = id;
}
}
public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) {
preparedTransactions.add(preparedTransaction);
}
public synchronized void addRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
ArrayList<Integer> indexes = new ArrayList<Integer>();
indexes.add(index);
deleteReferences.put(info.id, indexes);
checkMaxId(info.id);
}
public synchronized void updateRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
deleteReferences.get(info.id).add(index);
}
public synchronized void deleteRecord(final long id) {
for (Integer i : deleteReferences.get(id)) {
committedRecords.remove(i);
}
}
public int getNoRecords() {
return committedRecords.size();
}
@Override
public void failedTransaction(final long transactionID,
final List<RecordInfo> records,
final List<RecordInfo> recordsToDelete) {
if (failureCallback != null) {
failureCallback.failedTransaction(transactionID, records, recordsToDelete);
}
}
public long getMaxId() {
return maxId;
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalTransaction;
public class JDBCJournalReaderCallback implements JournalReaderCallback {
private final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
private final LoaderCallback loadManager;
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<Long, JournalTransaction>();
public JDBCJournalReaderCallback(LoaderCallback loadManager) {
this.loadManager = loadManager;
}
public void onReadAddRecord(final RecordInfo info) throws Exception {
loadManager.addRecord(info);
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
loadManager.updateRecord(info);
}
public void onReadDeleteRecord(final long recordID) throws Exception {
loadManager.deleteRecord(recordID);
}
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
onReadAddRecordTX(transactionID, info);
}
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordInfos.add(info);
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordsToDelete.add(info);
}
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.prepared = true;
tx.extraData = extraData;
}
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
if (tx != null) {
// JournalTransaction journalTransaction = transactions.remove(transactionID);
// if (journalTransaction == null)
// {
// throw new IllegalStateException("Cannot Commit, tx not found with ID: " + transactionID);
// }
for (RecordInfo txRecord : tx.recordInfos) {
if (txRecord.isUpdate) {
loadManager.updateRecord(txRecord);
}
else {
loadManager.addRecord(txRecord);
}
}
for (RecordInfo deleteValue : tx.recordsToDelete) {
loadManager.deleteRecord(deleteValue.id);
}
}
}
public void onReadRollbackRecord(final long transactionID) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
if (tx == null) {
throw new IllegalStateException("Cannot rollback, tx not found with ID: " + transactionID);
}
}
@Override
public void markAsDataFile(JournalFile file) {
// Not needed for JDBC journal impl
}
}

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
public class JDBCJournalRecord {
/*
Database Table Schema:
id BIGINT (long)
recordType SMALLINT (byte)
compactCount SMALLINT (byte)
txId BIGINT (long)
userRecordType SMALLINT (byte)
variableSize INT (int)
record BLOB (InputStream)
txDataSize INT (int)
txData BLOB (InputStream)
txCheckNoRecords INT (int)
*/
// Record types taken from Journal Impl
public static final byte ADD_RECORD = 11;
public static final byte UPDATE_RECORD = 12;
public static final byte ADD_RECORD_TX = 13;
public static final byte UPDATE_RECORD_TX = 14;
public static final byte DELETE_RECORD_TX = 15;
public static final byte DELETE_RECORD = 16;
public static final byte PREPARE_RECORD = 17;
public static final byte COMMIT_RECORD = 18;
public static final byte ROLLBACK_RECORD = 19;
// Callback and sync operations
private IOCompletion ioCompletion = null;
private boolean storeLineUp = false;
private boolean sync = false;
// DB Fields for all records
private Long id;
private byte recordType;
private byte compactCount;
private long txId;
// DB fields for ADD_RECORD(TX), UPDATE_RECORD(TX),
private int variableSize;
protected byte userRecordType;
private InputStream record;
// DB Fields for PREPARE_RECORD
private int txDataSize;
private InputStream txData;
// DB Fields for COMMIT_RECORD and PREPARE_RECORD
private int txCheckNoRecords;
private boolean isUpdate;
public JDBCJournalRecord(long id, byte recordType) {
this.id = id;
this.recordType = recordType;
this.isUpdate = recordType == UPDATE_RECORD || recordType == UPDATE_RECORD_TX;
// set defaults
compactCount = 0;
txId = 0;
variableSize = 0;
userRecordType = -1;
record = new ByteArrayInputStream(new byte[0]);
txDataSize = 0;
txData = new ByteArrayInputStream(new byte[0]);
txCheckNoRecords = 0;
}
public static String createTableSQL(String tableName) {
return "CREATE TABLE " + tableName + "(id BIGINT, " + "recordType SMALLINT, " + "compactCount SMALLINT, " + "txId BIGINT, " + "userRecordType SMALLINT, " + "variableSize INTEGER, " + "record BLOB, " + "txDataSize INTEGER, " + "txData BLOB, " + "txCheckNoRecords INTEGER)";
}
public static String insertRecordsSQL(String tableName) {
return "INSERT INTO " + tableName + "(id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords) " + "VALUES (?,?,?,?,?,?,?,?,?,?)";
}
public static String selectRecordsSQL(String tableName) {
return "SELECT id," + "recordType," + "compactCount," + "txId," + "userRecordType," + "variableSize," + "record," + "txDataSize," + "txData," + "txCheckNoRecords " + "FROM " + tableName;
}
public static String deleteRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE id = ?";
}
public static String deleteTxRecordsSQL(String tableName) {
return "DELETE FROM " + tableName + " WHERE txId = ?";
}
public void complete(boolean success) {
if (ioCompletion != null) {
if (success) {
ioCompletion.done();
}
else {
ioCompletion.onError(1, "DATABASE INSERT FAILED");
}
}
}
public void storeLineUp() {
if (storeLineUp && ioCompletion != null) {
ioCompletion.storeLineUp();
}
}
protected void writeRecord(PreparedStatement statement) throws SQLException {
statement.setLong(1, id);
statement.setByte(2, recordType);
statement.setByte(3, compactCount);
statement.setLong(4, txId);
statement.setByte(5, userRecordType);
statement.setInt(6, variableSize);
statement.setBlob(7, record);
statement.setInt(8, txDataSize);
statement.setBlob(9, txData);
statement.setInt(10, txCheckNoRecords);
statement.addBatch();
}
protected void writeDeleteTxRecord(PreparedStatement deleteTxStatement) throws SQLException {
deleteTxStatement.setLong(1, txId);
deleteTxStatement.addBatch();
}
protected void writeDeleteRecord(PreparedStatement deleteStatement) throws SQLException {
deleteStatement.setLong(1, id);
deleteStatement.addBatch();
}
public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2));
record.setCompactCount((byte) rs.getShort(3));
record.setTxId(rs.getLong(4));
record.setUserRecordType((byte) rs.getShort(5));
record.setVariableSize(rs.getInt(6));
record.setRecord(rs.getBytes(7));
record.setTxDataSize(rs.getInt(8));
record.setTxData(rs.getBytes(9));
record.setTxCheckNoRecords(rs.getInt(10));
return record;
}
public IOCompletion getIoCompletion() {
return ioCompletion;
}
public void setIoCompletion(IOCompletion ioCompletion) {
this.ioCompletion = ioCompletion;
}
public boolean isStoreLineUp() {
return storeLineUp;
}
public void setStoreLineUp(boolean storeLineUp) {
this.storeLineUp = storeLineUp;
}
public boolean isSync() {
return sync;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public Long getId() {
return id;
}
public byte getRecordType() {
return recordType;
}
public byte getCompactCount() {
return compactCount;
}
public void setCompactCount(byte compactCount) {
this.compactCount = compactCount;
}
public long getTxId() {
return txId;
}
public void setTxId(long txId) {
this.txId = txId;
}
public int getVariableSize() {
return variableSize;
}
public void setVariableSize(int variableSize) {
this.variableSize = variableSize;
}
public byte getUserRecordType() {
return userRecordType;
}
public void setUserRecordType(byte userRecordType) {
this.userRecordType = userRecordType;
}
public void setRecord(byte[] record) {
this.variableSize = record.length;
this.record = new ByteArrayInputStream(record);
}
public void setRecord(InputStream record) {
this.record = record;
}
public void setRecord(EncodingSupport record) {
this.variableSize = record.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
record.encode(encodedBuffer);
this.record = new ActiveMQBufferInputStream(encodedBuffer);
}
public InputStream getRecord() {
return record;
}
public int getTxCheckNoRecords() {
return txCheckNoRecords;
}
public void setTxCheckNoRecords(int txCheckNoRecords) {
this.txCheckNoRecords = txCheckNoRecords;
}
public void setTxDataSize(int txDataSize) {
this.txDataSize = txDataSize;
}
public int getTxDataSize() {
return txDataSize;
}
public InputStream getTxData() {
return txData;
}
public void setTxData(InputStream record) {
this.record = record;
}
public void setTxData(EncodingSupport txData) {
this.txDataSize = txData.getEncodeSize();
ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize);
txData.encode(encodedBuffer);
this.txData = new ActiveMQBufferInputStream(encodedBuffer);
}
public void setTxData(byte[] txData) {
this.txDataSize = txData.length;
this.txData = new ByteArrayInputStream(txData);
}
public boolean isUpdate() {
return isUpdate;
}
public byte[] getRecordData() throws IOException {
byte[] data = new byte[variableSize];
record.read(data);
return data;
}
public byte[] getTxDataAsByteArray() throws IOException {
byte[] data = new byte[txDataSize];
txData.read(data);
return data;
}
public RecordInfo toRecordInfo() throws IOException {
return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount());
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.sql.SQLException;
import java.util.TimerTask;
public class JDBCJournalSync extends TimerTask {
private final JDBCJournalImpl journal;
public JDBCJournalSync(JDBCJournalImpl journal) {
this.journal = journal;
}
@Override
public void run() {
try {
journal.sync();
}
catch (SQLException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jdbc.store.journal;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.core.journal.RecordInfo;
final class TransactionHolder {
public TransactionHolder(final long id) {
transactionID = id;
}
public final long transactionID;
public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public boolean prepared;
public boolean invalid;
public byte[] extraData;
}