ARTEMIS-1114 Missing records after compacting

This is fixing an issue introduced on 4b47461f03 (ARTEMIS-822)
The Transactions were being looked up without the readLock and some of the controls for Read and Write lock
were broken after this.
This commit is contained in:
Clebert Suconic 2017-04-13 16:53:53 -04:00
parent ec161fc157
commit ddacda5062
11 changed files with 520 additions and 172 deletions

View File

@ -17,63 +17,55 @@
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
public class SimpleFuture<V> implements Future<V> { public interface SimpleFuture<V> extends Future<V> {
public SimpleFuture() { SimpleFuture dumb = new SimpleFuture() {
} @Override
public void fail(Throwable e) {
V value;
Exception exception;
private final CountDownLatch latch = new CountDownLatch(1);
boolean canceled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return latch.getCount() <= 0;
}
public void fail(Exception e) {
this.exception = e;
latch.countDown();
}
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (this.exception != null) {
throw new ExecutionException(this.exception);
} }
return value;
@Override
public void set(Object o) {
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
};
static SimpleFuture dumb() {
return dumb;
} }
public void set(V v) { void fail(Throwable e);
this.value = v;
latch.countDown();
}
@Override void set(V v);
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return value;
}
} }

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SimpleFutureImpl<V> implements SimpleFuture<V> {
public SimpleFutureImpl() {
}
V value;
Throwable exception;
private final CountDownLatch latch = new CountDownLatch(1);
boolean canceled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return latch.getCount() <= 0;
}
@Override
public void fail(Throwable e) {
this.exception = e;
latch.countDown();
}
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (this.exception != null) {
throw new ExecutionException(this.exception);
}
return value;
}
@Override
public void set(V v) {
this.value = v;
latch.countDown();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return value;
}
}

View File

@ -29,7 +29,7 @@ public class SimpleFutureTest {
@Test @Test
public void testFuture() throws Exception { public void testFuture() throws Exception {
final long randomStart = System.currentTimeMillis(); final long randomStart = System.currentTimeMillis();
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>(); final SimpleFuture<Long> simpleFuture = new SimpleFutureImpl<>();
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
@ -44,7 +44,7 @@ public class SimpleFutureTest {
@Test @Test
public void testException() throws Exception { public void testException() throws Exception {
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>(); final SimpleFuture<Long> simpleFuture = new SimpleFutureImpl<>();
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {

View File

@ -162,13 +162,10 @@ abstract class JournalBase implements Journal {
abstract void scheduleReclaim(); abstract void scheduleReclaim();
protected SyncIOCompletion getSyncCallback(final boolean sync) { protected SyncIOCompletion getSyncCallback(final boolean sync) {
if (supportsCallback) { if (sync) {
if (sync) { return new SimpleWaitIOCallback();
return new SimpleWaitIOCallback();
}
return DummyCallback.getInstance();
} }
return null; return DummyCallback.getInstance();
} }
private static final class NullEncoding implements EncodingSupport { private static final class NullEncoding implements EncodingSupport {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.journal.impl; package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -139,10 +140,16 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
* This methods informs the Compactor about the existence of a pending (non committed) transaction * This methods informs the Compactor about the existence of a pending (non committed) transaction
*/ */
public void addPendingTransaction(final long transactionID, final long[] ids) { public void addPendingTransaction(final long transactionID, final long[] ids) {
if (logger.isTraceEnabled()) {
logger.trace("addPendingTransaction::tx=" + transactionID + ", ids=" + Arrays.toString(ids));
}
pendingTransactions.put(transactionID, new PendingTransaction(ids)); pendingTransactions.put(transactionID, new PendingTransaction(ids));
} }
public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile) { public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandCommit " + liveTransaction.getId());
}
pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile)); pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile));
long[] ids = liveTransaction.getPositiveArray(); long[] ids = liveTransaction.getPositiveArray();
@ -170,6 +177,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile) { public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandRollback " + liveTransaction + " currentFile " + currentFile);
}
pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile)); pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile));
} }
@ -178,6 +188,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
* @param usedFile * @param usedFile
*/ */
public void addCommandDelete(final long id, final JournalFile usedFile) { public void addCommandDelete(final long id, final JournalFile usedFile) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandDelete id " + id + " usedFile " + usedFile);
}
pendingCommands.add(new DeleteCompactCommand(id, usedFile)); pendingCommands.add(new DeleteCompactCommand(id, usedFile));
} }
@ -186,6 +199,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
* @param usedFile * @param usedFile
*/ */
public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) { public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
}
pendingCommands.add(new UpdateCompactCommand(id, usedFile, size)); pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
} }
@ -241,6 +257,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
*/ */
public void replayPendingCommands() { public void replayPendingCommands() {
for (CompactCommand command : pendingCommands) { for (CompactCommand command : pendingCommands) {
if (logger.isTraceEnabled()) {
logger.trace("Replay " + command);
}
try { try {
command.execute(); command.execute();
} catch (Exception e) { } catch (Exception e) {
@ -256,6 +275,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadAddRecord(final RecordInfo info) throws Exception { public void onReadAddRecord(final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Read Record " + info);
}
if (lookupRecord(info.id)) { if (lookupRecord(info.id)) {
JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short) (info.compactCount + 1)); addRecord.setCompactCount((short) (info.compactCount + 1));
@ -270,6 +292,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception { public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Read Add Recprd TX " + transactionID + " info " + info);
}
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -288,6 +313,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception { public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadCommitRecord " + transactionID);
}
if (pendingTransactions.get(transactionID) != null) { if (pendingTransactions.get(transactionID) != null) {
// Sanity check, this should never happen // Sanity check, this should never happen
ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID); ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID);
@ -307,6 +336,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadDeleteRecord(final long recordID) throws Exception { public void onReadDeleteRecord(final long recordID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadDeleteRecord " + recordID);
}
if (newRecords.get(recordID) != null) { if (newRecords.get(recordID) != null) {
// Sanity check, it should never happen // Sanity check, it should never happen
ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompactingDelete(recordID); ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompactingDelete(recordID);
@ -316,6 +349,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception { public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadDeleteRecordTX " + transactionID + " info " + info);
}
if (pendingTransactions.get(transactionID) != null) { if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -339,6 +376,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
public void onReadPrepareRecord(final long transactionID, public void onReadPrepareRecord(final long transactionID,
final byte[] extraData, final byte[] extraData,
final int numberOfRecords) throws Exception { final int numberOfRecords) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadPrepareRecord " + transactionID);
}
if (pendingTransactions.get(transactionID) != null) { if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -356,6 +397,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadRollbackRecord(final long transactionID) throws Exception { public void onReadRollbackRecord(final long transactionID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadRollbackRecord " + transactionID);
}
if (pendingTransactions.get(transactionID) != null) { if (pendingTransactions.get(transactionID) != null) {
// Sanity check, this should never happen // Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID + throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@ -378,6 +423,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception { public void onReadUpdateRecord(final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadUpdateRecord " + info);
}
if (lookupRecord(info.id)) { if (lookupRecord(info.id)) {
JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data));
@ -399,6 +448,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override @Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception { public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadUpdateRecordTX " + info);
}
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID); JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -423,8 +476,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private JournalTransaction getNewJournalTransaction(final long transactionID) { private JournalTransaction getNewJournalTransaction(final long transactionID) {
JournalTransaction newTransaction = newTransactions.get(transactionID); JournalTransaction newTransaction = newTransactions.get(transactionID);
if (newTransaction == null) { if (newTransaction == null) {
if (logger.isTraceEnabled()) {
logger.trace("creating new journal Transaction " + transactionID);
}
newTransaction = new JournalTransaction(transactionID, this); newTransaction = new JournalTransaction(transactionID, this);
newTransactions.put(transactionID, newTransaction); newTransactions.put(transactionID, newTransaction);
} else if (logger.isTraceEnabled()) {
// just logging
logger.trace("reusing TX " + transactionID);
} }
return newTransaction; return newTransaction;
} }
@ -485,6 +545,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
JournalRecord updateRecord = journal.getRecords().get(id); JournalRecord updateRecord = journal.getRecords().get(id);
updateRecord.addUpdateFile(usedFile, size); updateRecord.addUpdateFile(usedFile, size);
} }
@Override
public String toString() {
return "UpdateCompactCommand{" +
"id=" + id +
", usedFile=" + usedFile +
", size=" + size +
'}';
}
} }
private class CommitCompactCommand extends CompactCommand { private class CommitCompactCommand extends CompactCommand {
@ -510,6 +579,14 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
newTransactions.remove(liveTransaction.getId()); newTransactions.remove(liveTransaction.getId());
} }
@Override
public String toString() {
return "CommitCompactCommand{" +
"commitFile=" + commitFile +
", liveTransaction=" + liveTransaction +
'}';
}
} }
private class RollbackCompactCommand extends CompactCommand { private class RollbackCompactCommand extends CompactCommand {
@ -535,6 +612,14 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
} }
newTransactions.remove(liveTransaction.getId()); newTransactions.remove(liveTransaction.getId());
} }
@Override
public String toString() {
return "RollbackCompactCommand{" +
"liveTransaction=" + liveTransaction +
", rollbackFile=" + rollbackFile +
'}';
}
} }
@Override @Override

View File

@ -78,6 +78,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** /**
@ -619,6 +620,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// At this point everything is checked. So we relax and just load // At this point everything is checked. So we relax and just load
// the data now. // the data now.
if (logger.isTraceEnabled()) {
logger.trace("reading " + recordID + ", userRecordType=" + userRecordType + ", compactCount=" + compactCount);
}
switch (recordType) { switch (recordType) {
case ADD_RECORD: { case ADD_RECORD: {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount)); reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
@ -721,6 +726,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback); lineUpContext(callback);
pendingRecords.add(id); pendingRecords.add(id);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddRecord::id=" + id +
", userRecordType=" +
recordType +
", record = " + record);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@ -740,13 +752,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
", usedFile = " + ", usedFile = " +
usedFile); usedFile);
} }
if (result != null) { result.set(true);
result.set(true); } catch (Throwable e) {
} result.fail(e);
} catch (Exception e) { setErrorCondition(callback, null, e);
if (result != null) {
result.fail(e);
}
logger.error("appendAddRecord::" + e, e); logger.error("appendAddRecord::" + e, e);
} finally { } finally {
pendingRecords.remove(id); pendingRecords.remove(id);
@ -755,9 +764,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
}); });
if (result != null) { result.get();
result.get();
}
} }
@Override @Override
@ -771,6 +778,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback); lineUpContext(callback);
checkKnownRecordID(id); checkKnownRecordID(id);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id +
", userRecordType=" +
recordType);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@ -798,13 +811,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
} }
if (result != null) { result.set(true);
result.set(true);
}
} catch (Exception e) { } catch (Exception e) {
if (result != null) { result.fail(e);
result.fail(e); setErrorCondition(callback, null, e);
}
logger.error("appendUpdateRecord:" + e, e); logger.error("appendUpdateRecord:" + e, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
@ -812,13 +822,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
}); });
if (result != null) { result.get();
result.get();
}
} }
@Override @Override
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception { public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendDeleteRecord::id=" + id);
}
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
checkKnownRecordID(id); checkKnownRecordID(id);
@ -848,13 +862,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else { } else {
record.delete(usedFile); record.delete(usedFile);
} }
if (result != null) { result.set(true);
result.set(true);
}
} catch (Exception e) { } catch (Exception e) {
if (result != null) { result.fail(e);
result.fail(e);
}
logger.error("appendDeleteRecord:" + e, e); logger.error("appendDeleteRecord:" + e, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
@ -862,13 +872,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
}); });
if (result != null) { result.get();
result.get();
}
} }
private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) { private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
return (sync && callback == null) ? new SimpleFuture<Boolean>() : null; return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb();
} }
@Override @Override
@ -878,16 +886,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final Persister persister, final Persister persister,
final Object record) throws Exception { final Object record) throws Exception {
checkJournalIsLoaded(); checkJournalIsLoaded();
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record);
}
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try { try {
if (tx != null) {
tx.checkErrorCondition();
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
@ -905,7 +925,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, addRecord.getEncodeSize()); tx.addPositive(usedFile, id, addRecord.getEncodeSize());
} catch (Exception e) { } catch (Exception e) {
logger.error("appendAddRecordTransactional:" + e, e); logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(tx, e); setErrorCondition(null, tx, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
} }
@ -918,7 +938,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return; return;
} }
final SimpleFuture<Boolean> known = new SimpleFuture<>(); final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
// retry on the append thread. maybe the appender thread is not keeping up. // retry on the append thread. maybe the appender thread is not keeping up.
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@ -957,17 +977,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final byte recordType, final byte recordType,
final Persister persister, final Persister persister,
final Object record) throws Exception { final Object record) throws Exception {
if ( logger.isTraceEnabled() ) {
logger.trace( "scheduling appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record);
}
checkJournalIsLoaded(); checkJournalIsLoaded();
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try { try {
tx.checkErrorCondition();
JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record ); JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record );
JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null ); JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
@ -986,7 +1017,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
} catch ( Exception e ) { } catch ( Exception e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e ); logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition( tx, e ); setErrorCondition(null, tx, e );
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
} }
@ -998,16 +1029,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendDeleteRecordTransactional(final long txID, public void appendDeleteRecordTransactional(final long txID,
final long id, final long id,
final EncodingSupport record) throws Exception { final EncodingSupport record) throws Exception {
checkJournalIsLoaded(); if (logger.isTraceEnabled()) {
logger.trace("scheduling appendDeleteRecordTransactional::txID=" + txID +
", id=" +
id);
}
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition(); checkJournalIsLoaded();
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try { try {
if (tx != null) {
tx.checkErrorCondition();
}
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null); JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
@ -1023,7 +1064,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addNegative(usedFile, id); tx.addNegative(usedFile, id);
} catch (Exception e) { } catch (Exception e) {
logger.error("appendDeleteRecordTransactional:" + e, e); logger.error("appendDeleteRecordTransactional:" + e, e);
setErrorCondition(tx, e); setErrorCondition(null, tx, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
} }
@ -1050,16 +1091,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
final JournalTransaction tx = getTransactionInfo(txID); if (logger.isTraceEnabled()) {
tx.checkErrorCondition(); logger.trace("scheduling appendPrepareRecord::txID=" + txID);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try { try {
tx.checkErrorCondition();
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback); JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
@ -1068,23 +1115,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
tx.prepare(usedFile); tx.prepare(usedFile);
if (result != null) {
result.set(true);
}
} catch (Exception e) { } catch (Exception e) {
if (result != null) { result.fail(e);
result.fail(e);
}
logger.error("appendPrepareRecord:" + e, e); logger.error("appendPrepareRecord:" + e, e);
setErrorCondition(tx, e); setErrorCondition(callback, tx, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
result.set(tx);
} }
} }
}); });
if (result != null) { JournalTransaction tx = result.get();
result.get(); if (tx != null) {
tx.checkErrorCondition(); tx.checkErrorCondition();
} }
} }
@ -1096,12 +1139,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
} }
private void setErrorCondition(JournalTransaction jt, Throwable t) { private void setErrorCondition(IOCallback otherCallback, JournalTransaction jt, Throwable t) {
TransactionCallback callback = null;
if (jt != null) { if (jt != null) {
TransactionCallback callback = jt.getCurrentCallback(); callback = jt.getCurrentCallback();
if (callback != null && callback.getErrorMessage() != null) { if (callback != null && callback.getErrorMessage() != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage()); callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
} }
}
if (otherCallback != null && otherCallback != callback) {
otherCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
} }
} }
@ -1118,46 +1167,49 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback); lineUpContext(callback);
} }
final JournalTransaction tx = transactions.remove(txID);
if (tx == null) { if (logger.isTraceEnabled()) {
throw new IllegalStateException("Cannot find tx with id " + txID); logger.trace("scheduling appendCommitRecord::txID=" + txID );
} }
tx.checkErrorCondition(); JournalTransaction txcheck = transactions.get(txID);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback); if (txcheck != null) {
txcheck.checkErrorCondition();
}
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
// cannot remove otherwise compact may get lost
final JournalTransaction tx = transactions.remove(txID);
try { try {
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback); JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
}
tx.commit(usedFile); tx.commit(usedFile);
if (result != null) { } catch (Throwable e) {
result.set(true); result.fail(e);
}
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
logger.error("appendCommitRecord:" + e, e); logger.error("appendCommitRecord:" + e, e);
setErrorCondition(tx, e); setErrorCondition(callback, tx, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
result.set(tx);
} }
} }
}); });
if (result != null) { JournalTransaction tx = result.get();
result.get(); if (tx != null) {
tx.checkErrorCondition(); tx.checkErrorCondition();
} }
} }
@ -1167,40 +1219,47 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded(); checkJournalIsLoaded();
lineUpContext(callback); lineUpContext(callback);
final JournalTransaction tx = transactions.remove(txID);
if (tx == null) { if (logger.isTraceEnabled()) {
throw new IllegalStateException("Cannot find tx with id " + txID); logger.trace("scheduling appendRollbackRecord::txID=" + txID );
} }
tx.checkErrorCondition();
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() { appendExecutor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
journalLock.readLock().lock(); journalLock.readLock().lock();
final JournalTransaction tx = transactions.remove(txID);
try { try {
if (logger.isTraceEnabled()) {
logger.trace("appendRollbackRecord::txID=" + txID );
}
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile); tx.rollback(usedFile);
if (result != null) { } catch (Throwable e) {
result.set(true); result.fail(e);
}
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
logger.error("appendRollbackRecord:" + e, e); logger.error("appendRollbackRecord:" + e, e);
setErrorCondition(tx, e); setErrorCondition(callback, tx, e);
} finally { } finally {
journalLock.readLock().unlock(); journalLock.readLock().unlock();
result.set(tx);
} }
} }
}); });
if (result != null) { JournalTransaction tx = result.get();
result.get(); if (tx != null) {
tx.checkErrorCondition(); tx.checkErrorCondition();
} }
} }
@ -1545,6 +1604,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
} finally { } finally {
compactorLock.writeLock().unlock(); compactorLock.writeLock().unlock();
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact finishing");
}
} }
} }
@ -2544,7 +2608,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
callback = txcallback; callback = txcallback;
} else { } else {
callback = null; callback = parameterCallback;
} }
// We need to add the number of records on currentFile if prepare or commit // We need to add the number of records on currentFile if prepare or commit
@ -2591,19 +2655,24 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} }
private JournalTransaction getTransactionInfo(final long txID) { private JournalTransaction getTransactionInfo(final long txID) {
JournalTransaction tx = transactions.get(txID); journalLock.readLock().lock();
try {
JournalTransaction tx = transactions.get(txID);
if (tx == null) { if (tx == null) {
tx = new JournalTransaction(txID, this); tx = new JournalTransaction(txID, this);
JournalTransaction trans = transactions.putIfAbsent(txID, tx); JournalTransaction trans = transactions.putIfAbsent(txID, tx);
if (trans != null) { if (trans != null) {
tx = trans; tx = trans;
}
} }
}
return tx; return tx;
} finally {
journalLock.readLock().unlock();
}
} }
/** /**

View File

@ -28,9 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.jboss.logging.Logger;
public class JournalTransaction { public class JournalTransaction {
private static final Logger logger = Logger.getLogger(JournalTransaction.class);
private JournalRecordProvider journal; private JournalRecordProvider journal;
private List<JournalUpdate> pos; private List<JournalUpdate> pos;
@ -229,10 +232,17 @@ public class JournalTransaction {
public void commit(final JournalFile file) { public void commit(final JournalFile file) {
JournalCompactor compactor = journal.getCompactor(); JournalCompactor compactor = journal.getCompactor();
// The race lies here....
if (compacting && compactor != null) { if (compacting && compactor != null) {
if (logger.isTraceEnabled()) {
logger.trace("adding tx " + this.id + " into compacting");
}
compactor.addCommandCommit(this, file); compactor.addCommandCommit(this, file);
} else { } else {
if (logger.isTraceEnabled()) {
logger.trace("no compact commit " + this.id);
}
if (pos != null) { if (pos != null) {
for (JournalUpdate trUpdate : pos) { for (JournalUpdate trUpdate : pos) {
JournalRecord posFiles = journal.getRecords().get(trUpdate.id); JournalRecord posFiles = journal.getRecords().get(trUpdate.id);

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
@ -53,12 +54,15 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class NIOJournalCompactTest extends JournalImplTestBase { public class NIOJournalCompactTest extends JournalImplTestBase {
private static final Logger logger = Logger.getLogger(NIOJournalCompactTest.class);
private static final int NUMBER_OF_RECORDS = 1000; private static final int NUMBER_OF_RECORDS = 1000;
IDGenerator idGenerator = new SimpleIDGenerator(100000); IDGenerator idGenerator = new SimpleIDGenerator(100000);
@ -782,6 +786,97 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
loadAndCheck(); loadAndCheck();
} }
@Test
public void testLoopStressAppends() throws Exception {
for (int i = 0; i < 10; i++) {
logger.info("repetition " + i);
testStressAppends();
tearDown();
setUp();
}
}
@Test
public void testStressAppends() throws Exception {
setup(2, 60 * 1024, true);
final int NUMBER_OF_RECORDS = 200;
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
journal.setAutoReclaim(false);
startJournal();
load();
AtomicBoolean running = new AtomicBoolean(true);
Thread t = new Thread() {
@Override
public void run() {
while (running.get()) {
journal.testCompact();
}
}
};
t.start();
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
long tx = idGen.generateID();
addTx(tx, idGen.generateID());
LockSupport.parkNanos(1000);
commit(tx);
}
running.set(false);
t.join(50000);
if (t.isAlive()) {
t.interrupt();
Assert.fail("supposed to join thread");
}
stopJournal();
createJournal();
startJournal();
loadAndCheck();
}
@Test
public void testSimpleCommitCompactInBetween() throws Exception {
setup(2, 60 * 1024, false);
final int NUMBER_OF_RECORDS = 1;
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
journal.setAutoReclaim(false);
startJournal();
load();
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
long tx = idGen.generateID();
addTx(tx, idGen.generateID());
journal.testCompact();
journal.testCompact();
journal.testCompact();
journal.testCompact();
logger.info("going to commit");
commit(tx);
}
stopJournal();
createJournal();
startJournal();
loadAndCheck();
}
@Test @Test
public void testCompactAddAndUpdateFollowedByADelete2() throws Exception { public void testCompactAddAndUpdateFollowedByADelete2() throws Exception {
@ -917,8 +1012,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.testCompact(); journal.testCompact();
System.out.println("Debug after compact\n" + journal.debug());
stopJournal(); stopJournal();
createJournal(); createJournal();
startJournal(); startJournal();
@ -1666,10 +1759,14 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
survivingMsgs.add(message.getMessageID()); survivingMsgs.add(message.getMessageID());
logger.info("Going to store " + message);
// This one will stay here forever // This one will stay here forever
storage.storeMessage(message); storage.storeMessage(message);
logger.info("message storeed " + message);
logger.info("Going to commit " + tx);
storage.commit(tx); storage.commit(tx);
logger.info("Commited " + tx);
ctx.executeOnCompletion(new IOCallback() { ctx.executeOnCompletion(new IOCallback() {
@Override @Override
@ -1749,6 +1846,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS)); assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
throw e; throw e;

View File

@ -371,7 +371,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, transactions.size()); Assert.assertEquals(0, transactions.size());
try { try {
journalImpl.appendCommitRecord(1L, false); journalImpl.appendCommitRecord(1L, true);
// This was supposed to throw an exception, as the transaction was // This was supposed to throw an exception, as the transaction was
// forgotten (interrupted by a reload). // forgotten (interrupted by a reload).
Assert.fail("Supposed to throw exception"); Assert.fail("Supposed to throw exception");
@ -419,7 +419,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals((Long) 78L, incompleteTransactions.get(1)); Assert.assertEquals((Long) 78L, incompleteTransactions.get(1));
try { try {
journalImpl.appendCommitRecord(77L, false); journalImpl.appendCommitRecord(77L, true);
// This was supposed to throw an exception, as the transaction was // This was supposed to throw an exception, as the transaction was
// forgotten (interrupted by a reload). // forgotten (interrupted by a reload).
Assert.fail("Supposed to throw exception"); Assert.fail("Supposed to throw exception");

View File

@ -138,6 +138,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
try { try {
journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0)); journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
journalImpl.appendCommitRecord(1L, true);
Assert.fail("Exception expected"); Assert.fail("Exception expected");
// An exception already happened in one of the elements on this transaction. // An exception already happened in one of the elements on this transaction.
// We can't accept any more elements on the transaction // We can't accept any more elements on the transaction

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -36,12 +37,15 @@ import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
public abstract class JournalImplTestBase extends ActiveMQTestBase { public abstract class JournalImplTestBase extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(JournalImplTestBase.class);
protected List<RecordInfo> records = new LinkedList<>(); protected List<RecordInfo> records = new LinkedList<>();
protected TestableJournal journal; protected TestableJournal journal;
@ -156,13 +160,11 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
@Override @Override
public void onCompactDone() { public void onCompactDone() {
latchDone.countDown(); latchDone.countDown();
System.out.println("Waiting on Compact");
try { try {
latchWait.await(); latchWait.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("Waiting on Compact Done");
} }
}; };
@ -520,19 +522,31 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
* @param actual * @param actual
*/ */
protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual) { protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual) {
System.out.println("***********************************************");
System.out.println("Expected list:"); HashSet<RecordInfo> expectedSet = new HashSet<>();
for (RecordInfo info : expected) { expectedSet.addAll(expected);
System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
Assert.assertEquals("There are duplicated on the expected list", expectedSet.size(), expected.size());
HashSet<RecordInfo> actualSet = new HashSet<>();
actualSet.addAll(actual);
expectedSet.removeAll(actualSet);
for (RecordInfo info: expectedSet) {
logger.warn("The following record is missing:: " + info);
} }
if (actual != null) {
System.out.println("***********************************************");
System.out.println("Actual list:"); Assert.assertEquals("There are duplicates on the actual list", actualSet.size(), actualSet.size());
for (RecordInfo info : actual) {
System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
}
} RecordInfo[] expectedArray = expected.toArray(new RecordInfo[expected.size()]);
System.out.println("***********************************************"); RecordInfo[] actualArray = actual.toArray(new RecordInfo[actual.size()]);
Assert.assertArrayEquals(expectedArray, actualArray);
} }
protected byte[] generateRecord(final int length) { protected byte[] generateRecord(final int length) {