ARTEMIS-822 Add executor service to JournalImpl for append operations and remove synchronization

https://issues.apache.org/jira/browse/ARTEMIS-822
This commit is contained in:
barreiro 2016-01-22 03:23:26 +00:00 committed by Clebert Suconic
parent bfb9bedb2d
commit 4b47461f03
10 changed files with 622 additions and 345 deletions

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
import org.apache.activemq.artemis.utils.Base64;
@Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
@ -125,8 +124,6 @@ public class DecodeJournal extends LockAbstract {
long lineNumber = 0;
Map<Long, JournalRecord> journalRecords = journal.getRecords();
while ((line = buffReader.readLine()) != null) {
lineNumber++;
String[] splitLine = line.split(",");
@ -150,12 +147,6 @@ public class DecodeJournal extends LockAbstract {
counter.incrementAndGet();
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
} else if (operation.equals("AddRecordTX")) {
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
RecordInfo info = parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
} else if (operation.equals("UpdateTX")) {
long txID = parseLong("txID", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
@ -168,20 +159,17 @@ public class DecodeJournal extends LockAbstract {
} else if (operation.equals("DeleteRecord")) {
long id = parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
if (journalRecords.get(id) != null) {
try {
journal.appendDeleteRecord(id, false);
} catch (IllegalStateException ignored) {
// If not found it means the append/update records were reclaimed already
}
} else if (operation.equals("DeleteRecordTX")) {
long txID = parseLong("txID", lineProperties);
long id = parseLong("id", lineProperties);
AtomicInteger counter = getCounter(txID, txCounters);
counter.incrementAndGet();
// If not found it means the append/update records were reclaimed already
if (journalRecords.get(id) != null) {
journal.appendDeleteRecordTransactional(txID, id);
}
journal.appendDeleteRecordTransactional(txID, id);
} else if (operation.equals("Prepare")) {
long txID = parseLong("txID", lineProperties);
int numberOfRecords = parseInt("numberOfRecords", lineProperties);

View File

@ -22,7 +22,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.jboss.logging.Logger;
/**
@ -104,7 +103,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
// This could happen during shutdowns. Nothing to be concerned about here
logger.debug("Interrupted Thread", e);
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
logger.warn(t.getMessage(), t);
}
task = tasks.poll();
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SimpleFuture<V> implements Future<V> {
public SimpleFuture() {
}
V value;
Exception exception;
private final CountDownLatch latch = new CountDownLatch(1);
boolean canceled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
canceled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled() {
return canceled;
}
@Override
public boolean isDone() {
return latch.getCount() <= 0;
}
public void fail(Exception e) {
this.exception = e;
latch.countDown();
}
@Override
public V get() throws InterruptedException, ExecutionException {
latch.await();
if (this.exception != null) {
throw new ExecutionException(this.exception);
}
return value;
}
public void set(V v) {
this.value = v;
latch.countDown();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
latch.await(timeout, unit);
return value;
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class SimpleFutureTest {
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
@Test
public void testFuture() throws Exception {
final long randomStart = System.currentTimeMillis();
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
Thread t = new Thread() {
@Override
public void run() {
simpleFuture.set(randomStart);
}
};
t.start();
Assert.assertEquals(randomStart, simpleFuture.get().longValue());
}
@Test
public void testException() throws Exception {
final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
Thread t = new Thread() {
@Override
public void run() {
simpleFuture.fail(new Exception("hello"));
}
};
t.start();
boolean failed = false;
try {
simpleFuture.get();
} catch (Exception e) {
failed = true;
}
Assert.assertTrue(failed);
}
}

View File

@ -29,11 +29,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -45,6 +47,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@ -160,6 +163,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
@ -172,12 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private ExecutorService compactorExecutor = null;
private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
private ExecutorService appendExecutor = null;
// Lock used during the append of records
// This lock doesn't represent a global lock.
// After a record is appended, the usedFile can't be changed until the positives and negatives are updated
private final Object lockAppend = new Object();
private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
/**
* We don't lock the journal during the whole compacting operation. During compacting we only
@ -688,32 +690,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
pendingRecords.add(id);
journalLock.readLock().lock();
Future<?> result = appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
try {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
if (callback != null) {
callback.storeLineUp();
}
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendAddRecord::id=" + id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
if (logger.isTraceEnabled()) {
logger.trace("appendAddRecord::id=" + id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
}
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
} finally {
pendingRecords.remove(id);
journalLock.readLock().unlock();
}
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
} finally {
journalLock.readLock().unlock();
});
if (sync && callback == null) {
result.get();
}
}
@ -724,94 +731,86 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean sync,
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
checkKnownRecordID(id);
journalLock.readLock().lock();
Future<?> result = appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalRecord jrnRecord = records.get(id);
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
try {
JournalRecord jrnRecord = records.get(id);
if (logger.isTraceEnabled()) {
logger.trace("appendUpdateRecord::id=" + id +
", userRecordType=" +
recordType +
", usedFile = " +
usedFile);
}
if (jrnRecord == null) {
if (!(compactor != null && compactor.lookupRecord(id))) {
throw new IllegalStateException("Cannot find add info " + id);
// record==null here could only mean there is a compactor
// computing the delete should be done after compacting is done
if (jrnRecord == null) {
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
} else {
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
} finally {
journalLock.readLock().unlock();
}
}
});
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
if (callback != null) {
callback.storeLineUp();
}
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendUpdateRecord::id=" + id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
}
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (jrnRecord == null) {
compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
} else {
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
}
} finally {
journalLock.readLock().unlock();
if (sync && callback == null) {
result.get();
}
}
@Override
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
checkKnownRecordID(id);
journalLock.readLock().lock();
try {
Future<?> result = appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalRecord record = null;
if (compactor == null) {
record = records.remove(id);
}
JournalRecord record = null;
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
if (compactor == null) {
record = records.remove(id);
if (logger.isTraceEnabled()) {
logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
}
if (record == null) {
throw new IllegalStateException("Cannot find add info " + id);
}
} else {
if (!records.containsKey(id) && !compactor.lookupRecord(id)) {
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
// record==null here could only mean there is a compactor
// computing the delete should be done after compacting is done
if (record == null) {
compactor.addCommandDelete(id, usedFile);
} else {
record.delete(usedFile);
}
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
} finally {
journalLock.readLock().unlock();
}
}
});
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
if (callback != null) {
callback.storeLineUp();
}
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
}
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (record == null) {
compactor.addCommandDelete(id, usedFile);
} else {
record.delete(usedFile);
}
}
} finally {
journalLock.readLock().unlock();
if (sync && callback == null) {
result.get();
}
}
@ -822,31 +821,62 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
try {
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
appendExecutor.submit(new Runnable() {
JournalTransaction tx = getTransactionInfo(txID);
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (logger.isTraceEnabled()) {
logger.trace("appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
}
if (logger.isTraceEnabled()) {
logger.trace("appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
}
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
} finally {
journalLock.readLock().unlock();
});
}
private void checkKnownRecordID(final long id) throws Exception {
if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.lookupRecord(id))) {
return;
}
// retry on the append thread. maybe the appender thread is not keeping up.
Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
journalLock.readLock().lock();
try {
return records.containsKey(id)
|| pendingRecords.contains(id)
|| (compactor != null && compactor.lookupRecord(id));
} finally {
journalLock.readLock().unlock();
}
}
});
if (!known.get()) {
throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
}
}
@ -867,32 +897,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
try {
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
appendExecutor.submit(new Runnable() {
JournalTransaction tx = getTransactionInfo(txID);
@Override
public void run() {
journalLock.readLock().lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
if (logger.isTraceEnabled()) {
logger.trace("appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile);
if ( logger.isTraceEnabled() ) {
logger.trace( "appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
", userRecordType=" +
recordType +
", record = " + record +
", usedFile = " +
usedFile );
}
tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
} catch ( Exception e ) {
ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e );
setErrorCondition( tx, e );
} finally {
journalLock.readLock().unlock();
}
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
} finally {
journalLock.readLock().unlock();
}
});
}
@Override
@ -901,29 +938,35 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final EncodingSupport record) throws Exception {
checkJournalIsLoaded();
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
try {
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalTransaction tx = getTransactionInfo(txID);
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
if (logger.isTraceEnabled()) {
logger.trace("appendDeleteRecordTransactional::txID=" + txID +
", id=" +
id +
", usedFile = " +
usedFile);
}
if (logger.isTraceEnabled()) {
logger.trace("appendDeleteRecordTransactional::txID=" + txID +
", id=" +
id +
", usedFile = " +
usedFile);
tx.addNegative(usedFile, id);
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
}
tx.addNegative(usedFile, id);
}
} finally {
journalLock.readLock().unlock();
}
});
}
/**
@ -943,36 +986,53 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
journalLock.readLock().lock();
final JournalTransaction tx = getTransactionInfo(txID);
tx.checkErrorCondition();
try {
JournalTransaction tx = getTransactionInfo(txID);
Future<?> result = appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
if (logger.isTraceEnabled()) {
logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
}
if (callback != null) {
callback.storeLineUp();
}
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
tx.prepare(usedFile);
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
}
tx.prepare(usedFile);
}
});
} finally {
journalLock.readLock().unlock();
if (sync && callback == null) {
result.get();
tx.checkErrorCondition();
}
}
@Override
public void lineUpContext(IOCompletion callback) {
callback.storeLineUp();
if (callback != null) {
callback.storeLineUp();
}
}
private void setErrorCondition(JournalTransaction jt, Throwable t) {
if (jt != null) {
TransactionCallback callback = jt.getCurrentCallback();
if (callback != null && callback.getErrorMessage() != null) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
}
}
}
/**
@ -982,68 +1042,83 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void appendCommitRecord(final long txID,
final boolean sync,
final IOCompletion callback,
boolean lineUpContext) throws Exception {
final boolean lineUpContext) throws Exception {
checkJournalIsLoaded();
if (lineUpContext) {
lineUpContext(callback);
}
journalLock.readLock().lock();
final JournalTransaction tx = transactions.remove(txID);
try {
JournalTransaction tx = transactions.remove(txID);
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
tx.checkErrorCondition();
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
Future<?> result = appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (callback != null && lineUpContext) {
callback.storeLineUp();
}
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
}
if (logger.isTraceEnabled()) {
logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
tx.commit(usedFile);
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
}
tx.commit(usedFile);
}
});
} finally {
journalLock.readLock().unlock();
if (sync && callback == null) {
result.get();
tx.checkErrorCondition();
}
}
@Override
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
checkJournalIsLoaded();
lineUpContext(callback);
journalLock.readLock().lock();
final JournalTransaction tx = transactions.remove(txID);
JournalTransaction tx = null;
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
}
try {
tx = transactions.remove(txID);
tx.checkErrorCondition();
if (tx == null) {
throw new IllegalStateException("Cannot find tx with id " + txID);
Future<?> result = appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
setErrorCondition(tx, e);
} finally {
journalLock.readLock().unlock();
}
}
});
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
if (callback != null) {
callback.storeLineUp();
}
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
}
} finally {
journalLock.readLock().unlock();
if (sync && callback == null) {
result.get();
tx.checkErrorCondition();
}
}
@ -1906,13 +1981,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void debugWait() throws InterruptedException {
fileFactory.flush();
for (JournalTransaction tx : transactions.values()) {
tx.waitCallbacks();
if (appendExecutor != null && !appendExecutor.isShutdown()) {
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = newLatch(1);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
awaitLatch(latch, -1);
}
if (filesExecutor != null && !filesExecutor.isShutdown()) {
// Send something to the closingExecutor, just to make sure we went
// until its end
// Send something to the closingExecutor, just to make sure we went until its end
final CountDownLatch latch = newLatch(1);
filesExecutor.execute(new Runnable() {
@ -1985,20 +2070,52 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// In some tests we need to force the journal to move to a next file
@Override
public void forceMoveNextFile() throws Exception {
journalLock.readLock().lock();
debugWait();
journalLock.writeLock().lock();
try {
synchronized (lockAppend) {
moveNextFile(false);
debugWait();
}
moveNextFile(false);
} finally {
journalLock.readLock().unlock();
journalLock.writeLock().unlock();
}
}
@Override
public void perfBlast(final int pages) {
new PerfBlast(pages).start();
checkJournalIsLoaded();
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
final JournalInternalRecord blastRecord = new JournalInternalRecord() {
@Override
public int getEncodeSize() {
return byteEncoder.getEncodeSize();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
byteEncoder.encode(buffer);
}
};
appendExecutor.submit(new Runnable() {
@Override
public void run() {
journalLock.readLock().lock();
try {
for (int i = 0; i < pages; i++) {
appendRecord(blastRecord, false, false, null, null);
}
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
} finally {
journalLock.readLock().unlock();
}
}
});
}
// ActiveMQComponent implementation
@ -2031,6 +2148,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
});
appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
return new Thread(r, "JournalImpl::appendExecutor");
}
});
filesRepository.setExecutor(filesExecutor);
fileFactory.start();
@ -2044,46 +2169,50 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new IllegalStateException("Journal is already stopped");
}
setJournalState(JournalState.STOPPED);
// appendExecutor must be shut down first
appendExecutor.shutdown();
if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor();
}
journalLock.writeLock().lock();
try {
synchronized (lockAppend) {
compactorExecutor.shutdown();
setJournalState(JournalState.STOPPED);
compactorExecutor.shutdown();
if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
}
filesExecutor.shutdown();
filesRepository.setExecutor(null);
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
}
try {
for (CountDownLatch latch : latches) {
latch.countDown();
}
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
}
fileFactory.deactivateBuffer();
if (currentFile != null && currentFile.getFile().isOpen()) {
currentFile.getFile().close();
}
filesRepository.clear();
fileFactory.stop();
currentFile = null;
if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
}
filesExecutor.shutdown();
filesRepository.setExecutor(null);
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
}
try {
for (CountDownLatch latch : latches) {
latch.countDown();
}
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
}
fileFactory.deactivateBuffer();
if (currentFile != null && currentFile.getFile().isOpen()) {
currentFile.getFile().close();
}
filesRepository.clear();
fileFactory.stop();
currentFile = null;
} finally {
journalLock.writeLock().unlock();
}
@ -2358,7 +2487,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final boolean sync,
final JournalTransaction tx,
final IOCallback parameterCallback) throws Exception {
checkJournalIsLoaded();
final IOCallback callback;
@ -2552,46 +2680,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
private final class PerfBlast extends Thread {
private final int pages;
private PerfBlast(final int pages) {
super("activemq-perfblast-thread");
this.pages = pages;
}
@Override
public void run() {
synchronized (lockAppend) {
try {
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
JournalInternalRecord blastRecord = new JournalInternalRecord() {
@Override
public int getEncodeSize() {
return byteEncoder.getEncodeSize();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
byteEncoder.encode(buffer);
}
};
for (int i = 0; i < pages; i++) {
appendRecord(blastRecord, false, false, null, null);
}
} catch (Exception e) {
ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
}
}
}
}
@Override
public final void synchronizationLock() {
compactorLock.writeLock().lock();
@ -2624,7 +2712,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
long maxID = -1;
for (long id : fileIds) {
maxID = Math.max(maxID, id);
map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
map.put(id, filesRepository.createRemoteBackupSyncFile(id));
}
filesRepository.setNextFileID(maxID);
return map;

View File

@ -17,11 +17,13 @@
package org.apache.activemq.artemis.core.journal.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -45,12 +47,14 @@ public class JournalTransaction {
private boolean compacting = false;
private Map<JournalFile, TransactionCallback> callbackList;
private final Map<JournalFile, TransactionCallback> callbackList = Collections.synchronizedMap(new HashMap<JournalFile, TransactionCallback>());
private JournalFile lastFile = null;
private final AtomicInteger counter = new AtomicInteger();
private CountDownLatch firstCallbackLatch;
public JournalTransaction(final long id, final JournalRecordProvider journal) {
this.id = id;
this.journal = journal;
@ -139,9 +143,7 @@ public class JournalTransaction {
pendingFiles.clear();
}
if (callbackList != null) {
callbackList.clear();
}
callbackList.clear();
if (pos != null) {
pos.clear();
@ -156,6 +158,8 @@ public class JournalTransaction {
lastFile = null;
currentCallback = null;
firstCallbackLatch = null;
}
/**
@ -166,9 +170,13 @@ public class JournalTransaction {
data.setNumberOfRecords(getCounter(currentFile));
}
public TransactionCallback getCurrentCallback() {
return currentCallback;
}
public TransactionCallback getCallback(final JournalFile file) throws Exception {
if (callbackList == null) {
callbackList = new HashMap<>();
if (firstCallbackLatch != null && callbackList.isEmpty()) {
firstCallbackLatch.countDown();
}
currentCallback = callbackList.get(file);
@ -178,15 +186,19 @@ public class JournalTransaction {
callbackList.put(file, currentCallback);
}
if (currentCallback.getErrorMessage() != null) {
throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
}
currentCallback.countUp();
return currentCallback;
}
public void checkErrorCondition() throws Exception {
if (currentCallback != null) {
if (currentCallback.getErrorMessage() != null) {
throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
}
}
}
public void addPositive(final JournalFile file, final long id, final int size) {
incCounter(file);
@ -264,7 +276,8 @@ public class JournalTransaction {
}
public void waitCallbacks() throws InterruptedException {
if (callbackList != null) {
waitFirstCallback();
synchronized (callbackList) {
for (TransactionCallback callback : callbackList.values()) {
callback.waitCompletion();
}
@ -275,8 +288,15 @@ public class JournalTransaction {
* Wait completion at the latest file only
*/
public void waitCompletion() throws Exception {
if (currentCallback != null) {
currentCallback.waitCompletion();
waitFirstCallback();
currentCallback.waitCompletion();
}
private void waitFirstCallback() throws InterruptedException {
if (currentCallback == null) {
firstCallbackLatch = new CountDownLatch(1);
firstCallbackLatch.await();
firstCallbackLatch = null;
}
}

View File

@ -143,7 +143,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void compactReadError(JournalFile file);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting",
@Message(id = 142012, value = "Couldn't find tx={0} to merge after compacting",
format = Message.Format.MESSAGE_FORMAT)
void compactMergeError(Long id);
@ -163,12 +163,12 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void uncomittedTxFound(Long id);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds",
@Message(id = 142016, value = "Could not stop compactor executor after 120 seconds",
format = Message.Format.MESSAGE_FORMAT)
void couldNotStopCompactor();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds",
@Message(id = 142017, value = "Could not stop journal executor after 60 seconds",
format = Message.Format.MESSAGE_FORMAT)
void couldNotStopJournalExecutor();
@ -182,7 +182,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
void deletingOrphanedFile(String fileToDelete);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
@Message(id = 142020, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
void errorClosingFile(String fileToDelete);
@LogMessage(level = Logger.Level.WARN)
@ -241,6 +241,10 @@ public interface ActiveMQJournalLogger extends BasicLogger {
@Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT)
void errorSubmittingWrite(@Cause Throwable e);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 142035, value = "Could not stop journal append executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT)
void couldNotStopJournalAppendExecutor();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT)
void errorDeletingFile(Object e);

View File

@ -532,6 +532,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(1L, false);
journalImpl.debugWait();
System.out.println("Files = " + factory.listFiles("tt"));
SequentialFile file = factory.createSequentialFile("tt-1.tt");
@ -598,6 +600,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(2L, false);
journalImpl.debugWait();
SequentialFile file = factory.createSequentialFile("tt-1.tt");
file.open();
@ -697,6 +701,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.appendCommitRecord(1L, false);
journalImpl.debugWait();
SequentialFile file = factory.createSequentialFile("tt-1.tt");
file.open();
@ -936,8 +942,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.forceMoveNextFile();
// Reclaiming should still be able to reclaim a file if a transaction was
// ignored
// Reclaiming should still be able to reclaim a file if a transaction was ignored
journalImpl.checkReclaimStatus();
Assert.assertEquals(2, factory.listFiles("tt").size());
@ -1109,7 +1114,16 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
}
@Test
public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception {
public void testReclaimingAfterConcurrentAddsAndDeletesTx() throws Exception {
testReclaimingAfterConcurrentAddsAndDeletes(true);
}
@Test
public void testReclaimingAfterConcurrentAddsAndDeletesNonTx() throws Exception {
testReclaimingAfterConcurrentAddsAndDeletes(false);
}
public void testReclaimingAfterConcurrentAddsAndDeletes(final boolean transactional) throws Exception {
final int JOURNAL_SIZE = 10 * 1024;
setupAndLoadJournal(JOURNAL_SIZE, 1);
@ -1131,8 +1145,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
latchReady.countDown();
ActiveMQTestBase.waitForLatch(latchStart);
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) {
journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
journalImpl.appendCommitRecord(i, false);
if (transactional) {
journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
journalImpl.appendCommitRecord(i, false);
} else {
journalImpl.appendAddRecord(i, (byte) 1, new SimpleEncoding(50, (byte) 1), false);
}
queueDelete.offer(i);
}
finishedOK.incrementAndGet();
@ -1153,7 +1173,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
if (toDelete == null) {
break;
}
journalImpl.appendDeleteRecord(toDelete, false);
if (transactional) {
journalImpl.appendDeleteRecordTransactional(toDelete, toDelete, new SimpleEncoding(50, (byte) 1));
journalImpl.appendCommitRecord(i, false);
} else {
journalImpl.appendDeleteRecord(toDelete, false);
}
}
finishedOK.incrementAndGet();
} catch (Exception e) {

View File

@ -81,6 +81,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.appendAddRecordTransactional(1L, i, (byte) 1, new SimpleEncoding(1, (byte) 0));
}
journalImpl.debugWait();
latch.countDown();
factory.setHoldCallbacks(false, null);
if (isCommit) {
@ -115,8 +117,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
}
}
// If a callback error already arrived, we should just throw the exception
// right away
// If a callback error already arrived, we should just throw the exception right away
@Test
public void testPreviousError() throws Exception {
final int JOURNAL_SIZE = 20000;
@ -128,6 +129,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.appendAddRecordTransactional(1L, 1, (byte) 1, new SimpleEncoding(1, (byte) 0));
journalImpl.debugWait();
factory.flushAllCallbacks();
factory.setGenerateErrors(false);
@ -135,11 +138,11 @@ public class JournalAsyncTest extends ActiveMQTestBase {
try {
journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
Assert.fail("Exception expected"); // An exception already happened in one
// of the elements on this transaction.
// We can't accept any more elements on
// the transaction
Assert.fail("Exception expected");
// An exception already happened in one of the elements on this transaction.
// We can't accept any more elements on the transaction
} catch (Exception ignored) {
}
}