parent
85aeac99ab
commit
37d1d5ff7b
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
|||
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Used by the {@link org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager} to replicate journal calls.
|
||||
|
@ -44,11 +45,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
|
|||
*/
|
||||
public class ReplicatedJournal implements Journal {
|
||||
|
||||
private static final boolean trace = false;
|
||||
|
||||
private static void trace(final String message) {
|
||||
System.out.println("ReplicatedJournal::" + message);
|
||||
}
|
||||
private static final Logger log = Logger.getLogger(ReplicatedJournal.class);
|
||||
|
||||
private final ReplicationManager replicationManager;
|
||||
|
||||
|
@ -92,8 +89,8 @@ public class ReplicatedJournal implements Journal {
|
|||
Persister persister,
|
||||
final Object record,
|
||||
final boolean sync) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Append record id = " + id + " recordType = " + recordType);
|
||||
}
|
||||
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
|
||||
localJournal.appendAddRecord(id, recordType, persister, record, sync);
|
||||
|
@ -114,8 +111,8 @@ public class ReplicatedJournal implements Journal {
|
|||
final Object record,
|
||||
final boolean sync,
|
||||
final IOCompletion completionCallback) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Append record id = " + id + " recordType = " + recordType);
|
||||
}
|
||||
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record);
|
||||
localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback);
|
||||
|
@ -151,8 +148,8 @@ public class ReplicatedJournal implements Journal {
|
|||
final byte recordType,
|
||||
final Persister persister,
|
||||
final Object record) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("Append record TXid = " + id + " recordType = " + recordType);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Append record TXid = " + id + " recordType = " + recordType);
|
||||
}
|
||||
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
|
||||
localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
|
||||
|
@ -166,8 +163,8 @@ public class ReplicatedJournal implements Journal {
|
|||
*/
|
||||
@Override
|
||||
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendCommit " + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendCommit " + txID);
|
||||
}
|
||||
replicationManager.appendCommitRecord(journalID, txID, sync, true);
|
||||
localJournal.appendCommitRecord(txID, sync);
|
||||
|
@ -175,8 +172,8 @@ public class ReplicatedJournal implements Journal {
|
|||
|
||||
@Override
|
||||
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendCommit " + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendCommit " + txID);
|
||||
}
|
||||
replicationManager.appendCommitRecord(journalID, txID, sync, true);
|
||||
localJournal.appendCommitRecord(txID, sync, callback);
|
||||
|
@ -187,8 +184,8 @@ public class ReplicatedJournal implements Journal {
|
|||
boolean sync,
|
||||
IOCompletion callback,
|
||||
boolean lineUpContext) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendCommit " + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendCommit " + txID);
|
||||
}
|
||||
replicationManager.appendCommitRecord(journalID, txID, sync, lineUpContext);
|
||||
localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
|
||||
|
@ -202,8 +199,8 @@ public class ReplicatedJournal implements Journal {
|
|||
*/
|
||||
@Override
|
||||
public void appendDeleteRecord(final long id, final boolean sync) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendDelete " + id);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendDelete " + id);
|
||||
}
|
||||
replicationManager.appendDeleteRecord(journalID, id);
|
||||
localJournal.appendDeleteRecord(id, sync);
|
||||
|
@ -213,8 +210,8 @@ public class ReplicatedJournal implements Journal {
|
|||
public void appendDeleteRecord(final long id,
|
||||
final boolean sync,
|
||||
final IOCompletion completionCallback) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendDelete " + id);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendDelete " + id);
|
||||
}
|
||||
replicationManager.appendDeleteRecord(journalID, id);
|
||||
localJournal.appendDeleteRecord(id, sync, completionCallback);
|
||||
|
@ -243,8 +240,8 @@ public class ReplicatedJournal implements Journal {
|
|||
public void appendDeleteRecordTransactional(final long txID,
|
||||
final long id,
|
||||
final EncodingSupport record) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendDelete txID=" + txID + " id=" + id);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendDelete txID=" + txID + " id=" + id);
|
||||
}
|
||||
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
|
||||
localJournal.appendDeleteRecordTransactional(txID, id, record);
|
||||
|
@ -258,8 +255,8 @@ public class ReplicatedJournal implements Journal {
|
|||
*/
|
||||
@Override
|
||||
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
|
||||
}
|
||||
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
|
||||
localJournal.appendDeleteRecordTransactional(txID, id);
|
||||
|
@ -288,8 +285,8 @@ public class ReplicatedJournal implements Journal {
|
|||
public void appendPrepareRecord(final long txID,
|
||||
final EncodingSupport transactionData,
|
||||
final boolean sync) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendPrepare txID=" + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendPrepare txID=" + txID);
|
||||
}
|
||||
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
|
||||
localJournal.appendPrepareRecord(txID, transactionData, sync);
|
||||
|
@ -300,8 +297,8 @@ public class ReplicatedJournal implements Journal {
|
|||
final EncodingSupport transactionData,
|
||||
final boolean sync,
|
||||
final IOCompletion callback) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendPrepare txID=" + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendPrepare txID=" + txID);
|
||||
}
|
||||
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
|
||||
localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
|
||||
|
@ -315,8 +312,8 @@ public class ReplicatedJournal implements Journal {
|
|||
*/
|
||||
@Override
|
||||
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendRollback " + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendRollback " + txID);
|
||||
}
|
||||
replicationManager.appendRollbackRecord(journalID, txID);
|
||||
localJournal.appendRollbackRecord(txID, sync);
|
||||
|
@ -324,8 +321,8 @@ public class ReplicatedJournal implements Journal {
|
|||
|
||||
@Override
|
||||
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendRollback " + txID);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendRollback " + txID);
|
||||
}
|
||||
replicationManager.appendRollbackRecord(journalID, txID);
|
||||
localJournal.appendRollbackRecord(txID, sync, callback);
|
||||
|
@ -361,8 +358,8 @@ public class ReplicatedJournal implements Journal {
|
|||
final Persister persister,
|
||||
final Object record,
|
||||
final boolean sync) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
|
||||
}
|
||||
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
|
||||
localJournal.appendUpdateRecord(id, recordType, persister, record, sync);
|
||||
|
@ -375,8 +372,8 @@ public class ReplicatedJournal implements Journal {
|
|||
final Object record,
|
||||
final boolean sync,
|
||||
final IOCompletion completionCallback) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType);
|
||||
}
|
||||
replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
|
||||
localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback);
|
||||
|
@ -412,8 +409,8 @@ public class ReplicatedJournal implements Journal {
|
|||
final byte recordType,
|
||||
final Persister persister,
|
||||
final Object record) throws Exception {
|
||||
if (ReplicatedJournal.trace) {
|
||||
ReplicatedJournal.trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
|
||||
}
|
||||
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record);
|
||||
localJournal.appendUpdateRecordTransactional(txID, id, recordType, persister, record);
|
||||
|
|
Loading…
Reference in New Issue