ARTEMIS-1114 Missing records after compacting

This is fixing an issue introduced on 4b47461f03a607b9ef517beb2a1666ffae43a2a7 (ARTEMIS-822)
The Transactions were being looked up without the readLock and some of the controls for Read and Write lock
were broken after this.

(cherry picked from commit ddacda50626ef2cd5ccf74a3149eccbbda4a9d84)
This commit is contained in:
Clebert Suconic 2017-04-13 16:53:53 -04:00
parent f07e592a66
commit ec9615a01a
11 changed files with 519 additions and 172 deletions

View File

@ -17,63 +17,55 @@
package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SimpleFuture<V> implements Future<V> {
public interface SimpleFuture<V> extends Future<V> {
public SimpleFuture() {
}
SimpleFuture dumb = new SimpleFuture() {
@Override
public void fail(Throwable e) {
V value;
Exception exception;
private final CountDownLatch latch = new CountDownLatch(1);
boolean canceled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return latch.getCount() <= 0;
}
public void fail(Exception e) {
this.exception = e;
latch.countDown();
}
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (this.exception != null) {
throw new ExecutionException(this.exception);
}
return value;
@Override
public void set(Object o) {
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
};
static SimpleFuture dumb() {
return dumb;
}
public void set(V v) {
this.value = v;
latch.countDown();
}
void fail(Throwable e);
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return value;
}
void set(V v);
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SimpleFutureImpl<V> implements SimpleFuture<V> {
public SimpleFutureImpl() {
}
V value;
Throwable exception;
private final CountDownLatch latch = new CountDownLatch(1);
boolean canceled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return latch.getCount() <= 0;
}
@Override
public void fail(Throwable e) {
this.exception = e;
latch.countDown();
}
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (this.exception != null) {
throw new ExecutionException(this.exception);
}
return value;
}
@Override
public void set(V v) {
this.value = v;
latch.countDown();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return value;
}
}

View File

@ -29,7 +29,7 @@ public class SimpleFutureTest {
@Test
public void testFuture() throws Exception {
final long randomStart = System.currentTimeMillis();
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
final SimpleFuture<Long> simpleFuture = new SimpleFutureImpl<>();
Thread t = new Thread() {
@Override
public void run() {
@ -44,7 +44,7 @@ public class SimpleFutureTest {
@Test
public void testException() throws Exception {
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
final SimpleFuture<Long> simpleFuture = new SimpleFutureImpl<>();
Thread t = new Thread() {
@Override
public void run() {

View File

@ -213,13 +213,10 @@ abstract class JournalBase implements Journal {
abstract void scheduleReclaim();
protected SyncIOCompletion getSyncCallback(final boolean sync) {
if (supportsCallback) {
if (sync) {
return new SimpleWaitIOCallback();
}
return DummyCallback.getInstance();
if (sync) {
return new SimpleWaitIOCallback();
}
return null;
return DummyCallback.getInstance();
}
private static final class NullEncoding implements EncodingSupport {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -138,10 +139,16 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
* This methods informs the Compactor about the existence of a pending (non committed) transaction
*/
public void addPendingTransaction(final long transactionID, final long[] ids) {
if (logger.isTraceEnabled()) {
logger.trace("addPendingTransaction::tx=" + transactionID + ", ids=" + Arrays.toString(ids));
}
pendingTransactions.put(transactionID, new PendingTransaction(ids));
}
public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandCommit " + liveTransaction.getId());
}
pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile));
long[] ids = liveTransaction.getPositiveArray();
@ -169,6 +176,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
}
public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandRollback " + liveTransaction + " currentFile " + currentFile);
}
pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile));
}
@ -177,6 +187,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
* @param usedFile
*/
public void addCommandDelete(final long id, final JournalFile usedFile) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandDelete id " + id + " usedFile " + usedFile);
}
pendingCommands.add(new DeleteCompactCommand(id, usedFile));
}
@ -185,6 +198,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
* @param usedFile
*/
public void addCommandUpdate(final long id, final JournalFile usedFile, final int size) {
if (logger.isTraceEnabled()) {
logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size);
}
pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
}
@ -240,6 +256,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
*/
public void replayPendingCommands() {
for (CompactCommand command : pendingCommands) {
if (logger.isTraceEnabled()) {
logger.trace("Replay " + command);
}
try {
command.execute();
} catch (Exception e) {
@ -255,6 +274,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Read Record " + info);
}
if (lookupRecord(info.id)) {
JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short) (info.compactCount + 1));
@ -269,6 +291,9 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Read Add Recprd TX " + transactionID + " info " + info);
}
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -287,6 +312,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadCommitRecord " + transactionID);
}
if (pendingTransactions.get(transactionID) != null) {
// Sanity check, this should never happen
ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompacting(transactionID);
@ -306,6 +335,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadDeleteRecord(final long recordID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadDeleteRecord " + recordID);
}
if (newRecords.get(recordID) != null) {
// Sanity check, it should never happen
ActiveMQJournalLogger.LOGGER.inconsistencyDuringCompactingDelete(recordID);
@ -315,6 +348,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadDeleteRecordTX " + transactionID + " info " + info);
}
if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -338,6 +375,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadPrepareRecord " + transactionID);
}
if (pendingTransactions.get(transactionID) != null) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -355,6 +396,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadRollbackRecord(final long transactionID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadRollbackRecord " + transactionID);
}
if (pendingTransactions.get(transactionID) != null) {
// Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@ -377,6 +422,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadUpdateRecord " + info);
}
if (lookupRecord(info.id)) {
JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data));
@ -398,6 +447,10 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
@Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("onReadUpdateRecordTX " + info);
}
if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) {
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@ -422,8 +475,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
private JournalTransaction getNewJournalTransaction(final long transactionID) {
JournalTransaction newTransaction = newTransactions.get(transactionID);
if (newTransaction == null) {
if (logger.isTraceEnabled()) {
logger.trace("creating new journal Transaction " + transactionID);
}
newTransaction = new JournalTransaction(transactionID, this);
newTransactions.put(transactionID, newTransaction);
} else if (logger.isTraceEnabled()) {
// just logging
logger.trace("reusing TX " + transactionID);
}
return newTransaction;
}
@ -484,6 +544,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
JournalRecord updateRecord = journal.getRecords().get(id);
updateRecord.addUpdateFile(usedFile, size);
}
@Override
public String toString() {
return "UpdateCompactCommand{" +
"id=" + id +
", usedFile=" + usedFile +
", size=" + size +
'}';
}
}
private class CommitCompactCommand extends CompactCommand {
@ -509,6 +578,14 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
}
newTransactions.remove(liveTransaction.getId());
}
@Override
public String toString() {
return "CommitCompactCommand{" +
"commitFile=" + commitFile +
", liveTransaction=" + liveTransaction +
'}';
}
}
private class RollbackCompactCommand extends CompactCommand {
@ -534,6 +611,14 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
}
newTransactions.remove(liveTransaction.getId());
}
@Override
public String toString() {
return "RollbackCompactCommand{" +
"liveTransaction=" + liveTransaction +
", rollbackFile=" + rollbackFile +
'}';
}
}
@Override

View File

@ -78,6 +78,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.jboss.logging.Logger;
/**
@ -619,6 +620,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// At this point everything is checked. So we relax and just load
// the data now.
if (logger.isTraceEnabled()) {
logger.trace("reading " + recordID + ", userRecordType=" + userRecordType + ", compactCount=" + compactCount);
}
switch (recordType) {
case ADD_RECORD: {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount));
@ -720,6 +725,13 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback);
pendingRecords.add(id);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddRecord::id=" + id +
", userRecordType=" +
recordType +
", record = " + record);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@ -739,13 +751,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
", usedFile = " +
usedFile);
}
if (result != null) {
result.set(true);
}
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
result.set(true);
} catch (Throwable e) {
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendAddRecord::" + e, e);
} finally {
pendingRecords.remove(id);
@ -754,9 +763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
if (result != null) {
result.get();
}
result.get();
}
@Override
@ -769,6 +776,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback);
checkKnownRecordID(id);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id +
", userRecordType=" +
recordType);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@ -796,13 +809,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
if (result != null) {
result.set(true);
}
result.set(true);
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
result.fail(e);
setErrorCondition(callback, null, e);
logger.error("appendUpdateRecord:" + e, e);
} finally {
journalLock.readLock().unlock();
@ -810,13 +820,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
if (result != null) {
result.get();
}
result.get();
}
@Override
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendDeleteRecord::id=" + id);
}
checkJournalIsLoaded();
lineUpContext(callback);
checkKnownRecordID(id);
@ -846,13 +860,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} else {
record.delete(usedFile);
}
if (result != null) {
result.set(true);
}
result.set(true);
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
result.fail(e);
logger.error("appendDeleteRecord:" + e, e);
} finally {
journalLock.readLock().unlock();
@ -860,13 +870,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
if (result != null) {
result.get();
}
result.get();
}
private static SimpleFuture<Boolean> newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
return (sync && callback == null) ? new SimpleFuture<>() : null;
private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) {
return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb();
}
@Override
@ -875,16 +883,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final byte recordType,
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record);
}
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try {
if (tx != null) {
tx.checkErrorCondition();
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
@ -902,7 +922,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
} catch (Exception e) {
logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(tx, e);
setErrorCondition(null, tx, e);
} finally {
journalLock.readLock().unlock();
}
@ -915,7 +935,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
return;
}
final SimpleFuture<Boolean> known = new SimpleFuture<>();
final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
// retry on the append thread. maybe the appender thread is not keeping up.
appendExecutor.execute(new Runnable() {
@ -953,17 +973,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final long id,
final byte recordType,
final EncodingSupport record) throws Exception {
if ( logger.isTraceEnabled() ) {
logger.trace( "scheduling appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record);
}
checkJournalIsLoaded();
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try {
tx.checkErrorCondition();
JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
@ -982,7 +1012,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
} catch ( Exception e ) {
logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e );
setErrorCondition( tx, e );
setErrorCondition(null, tx, e );
} finally {
journalLock.readLock().unlock();
}
@ -994,16 +1024,26 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendDeleteRecordTransactional(final long txID,
final long id,
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendDeleteRecordTransactional::txID=" + txID +
", id=" +
id);
}
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
checkJournalIsLoaded();
appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try {
if (tx != null) {
tx.checkErrorCondition();
}
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
@ -1019,7 +1059,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addNegative(usedFile, id);
} catch (Exception e) {
logger.error("appendDeleteRecordTransactional:" + e, e);
setErrorCondition(tx, e);
setErrorCondition(null, tx, e);
} finally {
journalLock.readLock().unlock();
}
@ -1046,16 +1086,22 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded();
lineUpContext(callback);
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendPrepareRecord::txID=" + txID);
}
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
try {
tx.checkErrorCondition();
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
@ -1064,23 +1110,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
tx.prepare(usedFile);
if (result != null) {
result.set(true);
}
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
result.fail(e);
logger.error("appendPrepareRecord:" + e, e);
setErrorCondition(tx, e);
setErrorCondition(callback, tx, e);
} finally {
journalLock.readLock().unlock();
result.set(tx);
}
}
});
if (result != null) {
result.get();
JournalTransaction tx = result.get();
if (tx != null) {
tx.checkErrorCondition();
}
}
@ -1092,12 +1134,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
private void setErrorCondition(JournalTransaction jt, Throwable t) {
private void setErrorCondition(IOCallback otherCallback, JournalTransaction jt, Throwable t) {
TransactionCallback callback = null;
if (jt != null) {
TransactionCallback callback = jt.getCurrentCallback();
callback = jt.getCurrentCallback();
if (callback != null && callback.getErrorMessage() != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
}
}
if (otherCallback != null && otherCallback != callback) {
otherCallback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
}
}
@ -1114,46 +1162,49 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
lineUpContext(callback);
}
final JournalTransaction tx = transactions.remove(txID);
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendCommitRecord::txID=" + txID );
}
tx.checkErrorCondition();
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
JournalTransaction txcheck = transactions.get(txID);
if (txcheck != null) {
txcheck.checkErrorCondition();
}
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
// cannot remove otherwise compact may get lost
final JournalTransaction tx = transactions.remove(txID);
try {
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
}
tx.commit(usedFile);
if (result != null) {
result.set(true);
}
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
} catch (Throwable e) {
result.fail(e);
logger.error("appendCommitRecord:" + e, e);
setErrorCondition(tx, e);
setErrorCondition(callback, tx, e);
} finally {
journalLock.readLock().unlock();
result.set(tx);
}
}
});
if (result != null) {
result.get();
JournalTransaction tx = result.get();
if (tx != null) {
tx.checkErrorCondition();
}
}
@ -1163,40 +1214,47 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
checkJournalIsLoaded();
lineUpContext(callback);
final JournalTransaction tx = transactions.remove(txID);
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendRollbackRecord::txID=" + txID );
}
tx.checkErrorCondition();
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
final SimpleFuture<JournalTransaction> result = newSyncAndCallbackResult(sync, callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
final JournalTransaction tx = transactions.remove(txID);
try {
if (logger.isTraceEnabled()) {
logger.trace("appendRollbackRecord::txID=" + txID );
}
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
if (result != null) {
result.set(true);
}
} catch (Exception e) {
if (result != null) {
result.fail(e);
}
} catch (Throwable e) {
result.fail(e);
logger.error("appendRollbackRecord:" + e, e);
setErrorCondition(tx, e);
setErrorCondition(callback, tx, e);
} finally {
journalLock.readLock().unlock();
result.set(tx);
}
}
});
if (result != null) {
result.get();
JournalTransaction tx = result.get();
if (tx != null) {
tx.checkErrorCondition();
}
}
@ -1541,6 +1599,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
} finally {
compactorLock.writeLock().unlock();
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact finishing");
}
}
}
@ -2579,7 +2642,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
callback = txcallback;
} else {
callback = null;
callback = parameterCallback;
}
// We need to add the number of records on currentFile if prepare or commit
@ -2626,19 +2689,24 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
private JournalTransaction getTransactionInfo(final long txID) {
JournalTransaction tx = transactions.get(txID);
journalLock.readLock().lock();
try {
JournalTransaction tx = transactions.get(txID);
if (tx == null) {
tx = new JournalTransaction(txID, this);
if (tx == null) {
tx = new JournalTransaction(txID, this);
JournalTransaction trans = transactions.putIfAbsent(txID, tx);
JournalTransaction trans = transactions.putIfAbsent(txID, tx);
if (trans != null) {
tx = trans;
if (trans != null) {
tx = trans;
}
}
}
return tx;
return tx;
} finally {
journalLock.readLock().unlock();
}
}
/**

View File

@ -28,9 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
import org.jboss.logging.Logger;
public class JournalTransaction {
private static final Logger logger = Logger.getLogger(JournalTransaction.class);
private JournalRecordProvider journal;
private List<JournalUpdate> pos;
@ -229,10 +232,17 @@ public class JournalTransaction {
public void commit(final JournalFile file) {
JournalCompactor compactor = journal.getCompactor();
// The race lies here....
if (compacting && compactor != null) {
if (logger.isTraceEnabled()) {
logger.trace("adding tx " + this.id + " into compacting");
}
compactor.addCommandCommit(this, file);
} else {
if (logger.isTraceEnabled()) {
logger.trace("no compact commit " + this.id);
}
if (pos != null) {
for (JournalUpdate trUpdate : pos) {
JournalRecord posFiles = journal.getRecords().get(trUpdate.id);

View File

@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.Configuration;
@ -53,12 +54,15 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class NIOJournalCompactTest extends JournalImplTestBase {
private static final Logger logger = Logger.getLogger(NIOJournalCompactTest.class);
private static final int NUMBER_OF_RECORDS = 1000;
IDGenerator idGenerator = new SimpleIDGenerator(100000);
@ -782,6 +786,97 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
loadAndCheck();
}
@Test
public void testLoopStressAppends() throws Exception {
for (int i = 0; i < 10; i++) {
logger.info("repetition " + i);
testStressAppends();
tearDown();
setUp();
}
}
@Test
public void testStressAppends() throws Exception {
setup(2, 60 * 1024, true);
final int NUMBER_OF_RECORDS = 200;
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
journal.setAutoReclaim(false);
startJournal();
load();
AtomicBoolean running = new AtomicBoolean(true);
Thread t = new Thread() {
@Override
public void run() {
while (running.get()) {
journal.testCompact();
}
}
};
t.start();
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
long tx = idGen.generateID();
addTx(tx, idGen.generateID());
LockSupport.parkNanos(1000);
commit(tx);
}
running.set(false);
t.join(50000);
if (t.isAlive()) {
t.interrupt();
Assert.fail("supposed to join thread");
}
stopJournal();
createJournal();
startJournal();
loadAndCheck();
}
@Test
public void testSimpleCommitCompactInBetween() throws Exception {
setup(2, 60 * 1024, false);
final int NUMBER_OF_RECORDS = 1;
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
createJournal();
journal.setAutoReclaim(false);
startJournal();
load();
for (int i = 0; i < NUMBER_OF_RECORDS; i++) {
long tx = idGen.generateID();
addTx(tx, idGen.generateID());
journal.testCompact();
journal.testCompact();
journal.testCompact();
journal.testCompact();
logger.info("going to commit");
commit(tx);
}
stopJournal();
createJournal();
startJournal();
loadAndCheck();
}
@Test
public void testCompactAddAndUpdateFollowedByADelete2() throws Exception {
@ -917,8 +1012,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.testCompact();
System.out.println("Debug after compact\n" + journal.debug());
stopJournal();
createJournal();
startJournal();
@ -1666,10 +1759,14 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
survivingMsgs.add(message.getMessageID());
logger.info("Going to store " + message);
// This one will stay here forever
storage.storeMessage(message);
logger.info("message storeed " + message);
logger.info("Going to commit " + tx);
storage.commit(tx);
logger.info("Commited " + tx);
ctx.executeOnCompletion(new IOCallback() {
@Override
@ -1749,6 +1846,8 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
assertTrue("ioexecutor failed to terminate", ioexecutor.awaitTermination(30, TimeUnit.SECONDS));
Assert.assertEquals(0, errors.get());
} catch (Throwable e) {
e.printStackTrace();
throw e;

View File

@ -371,7 +371,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals(0, transactions.size());
try {
journalImpl.appendCommitRecord(1L, false);
journalImpl.appendCommitRecord(1L, true);
// This was supposed to throw an exception, as the transaction was
// forgotten (interrupted by a reload).
Assert.fail("Supposed to throw exception");
@ -419,7 +419,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
Assert.assertEquals((Long) 78L, incompleteTransactions.get(1));
try {
journalImpl.appendCommitRecord(77L, false);
journalImpl.appendCommitRecord(77L, true);
// This was supposed to throw an exception, as the transaction was
// forgotten (interrupted by a reload).
Assert.fail("Supposed to throw exception");

View File

@ -138,6 +138,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
try {
journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
journalImpl.appendCommitRecord(1L, true);
Assert.fail("Exception expected");
// An exception already happened in one of the elements on this transaction.
// We can't accept any more elements on the transaction

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@ -36,12 +37,15 @@ import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
public abstract class JournalImplTestBase extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(JournalImplTestBase.class);
protected List<RecordInfo> records = new LinkedList<>();
protected TestableJournal journal;
@ -156,13 +160,11 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
@Override
public void onCompactDone() {
latchDone.countDown();
System.out.println("Waiting on Compact");
try {
latchWait.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiting on Compact Done");
}
};
@ -520,19 +522,31 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
* @param actual
*/
protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual) {
System.out.println("***********************************************");
System.out.println("Expected list:");
for (RecordInfo info : expected) {
System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
HashSet<RecordInfo> expectedSet = new HashSet<>();
expectedSet.addAll(expected);
Assert.assertEquals("There are duplicated on the expected list", expectedSet.size(), expected.size());
HashSet<RecordInfo> actualSet = new HashSet<>();
actualSet.addAll(actual);
expectedSet.removeAll(actualSet);
for (RecordInfo info: expectedSet) {
logger.warn("The following record is missing:: " + info);
}
if (actual != null) {
System.out.println("***********************************************");
System.out.println("Actual list:");
for (RecordInfo info : actual) {
System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
}
}
System.out.println("***********************************************");
Assert.assertEquals("There are duplicates on the actual list", actualSet.size(), actualSet.size());
RecordInfo[] expectedArray = expected.toArray(new RecordInfo[expected.size()]);
RecordInfo[] actualArray = actual.toArray(new RecordInfo[actual.size()]);
Assert.assertArrayEquals(expectedArray, actualArray);
}
protected byte[] generateRecord(final int length) {