diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java index b392f6ffb0..f290eba623 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java @@ -33,7 +33,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.JournalRecord; import org.apache.activemq.artemis.utils.Base64; @Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files") @@ -125,8 +124,6 @@ public class DecodeJournal extends LockAbstract { long lineNumber = 0; - Map journalRecords = journal.getRecords(); - while ((line = buffReader.readLine()) != null) { lineNumber++; String[] splitLine = line.split(","); @@ -150,12 +147,6 @@ public class DecodeJournal extends LockAbstract { counter.incrementAndGet(); RecordInfo info = parseRecord(lineProperties); journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); - } else if (operation.equals("AddRecordTX")) { - long txID = parseLong("txID", lineProperties); - AtomicInteger counter = getCounter(txID, txCounters); - counter.incrementAndGet(); - RecordInfo info = parseRecord(lineProperties); - journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data); } else if (operation.equals("UpdateTX")) { long txID = parseLong("txID", lineProperties); AtomicInteger counter = getCounter(txID, txCounters); @@ -168,20 +159,17 @@ public class DecodeJournal extends LockAbstract { } else if (operation.equals("DeleteRecord")) { long id = parseLong("id", lineProperties); - // If not found it means the append/update records were reclaimed already - if (journalRecords.get(id) != null) { + try { journal.appendDeleteRecord(id, false); + } catch (IllegalStateException ignored) { + // If not found it means the append/update records were reclaimed already } } else if (operation.equals("DeleteRecordTX")) { long txID = parseLong("txID", lineProperties); long id = parseLong("id", lineProperties); AtomicInteger counter = getCounter(txID, txCounters); counter.incrementAndGet(); - - // If not found it means the append/update records were reclaimed already - if (journalRecords.get(id) != null) { - journal.appendDeleteRecordTransactional(txID, id); - } + journal.appendDeleteRecordTransactional(txID, id); } else if (operation.equals("Prepare")) { long txID = parseLong("txID", lineProperties); int numberOfRecords = parseInt("numberOfRecords", lineProperties); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java similarity index 100% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java similarity index 96% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java index 609af8ead2..c7d5c03166 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java @@ -22,7 +22,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.jboss.logging.Logger; /** @@ -104,7 +103,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory { // This could happen during shutdowns. Nothing to be concerned about here logger.debug("Interrupted Thread", e); } catch (Throwable t) { - ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t); + logger.warn(t.getMessage(), t); } task = tasks.poll(); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java new file mode 100644 index 0000000000..eedfef492f --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SimpleFuture implements Future { + + public SimpleFuture() { + } + + V value; + Exception exception; + + private final CountDownLatch latch = new CountDownLatch(1); + + boolean canceled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + canceled = true; + latch.countDown(); + return true; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return latch.getCount() <= 0; + } + + public void fail(Exception e) { + this.exception = e; + latch.countDown(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + latch.await(); + if (this.exception != null) { + throw new ExecutionException(this.exception); + } + return value; + } + + public void set(V v) { + this.value = v; + latch.countDown(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + latch.await(timeout, unit); + return value; + } +} diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java new file mode 100644 index 0000000000..00fd5d7dc4 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class SimpleFutureTest { + + @Rule + public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); + + @Test + public void testFuture() throws Exception { + final long randomStart = System.currentTimeMillis(); + final SimpleFuture simpleFuture = new SimpleFuture<>(); + Thread t = new Thread() { + @Override + public void run() { + simpleFuture.set(randomStart); + } + }; + t.start(); + + Assert.assertEquals(randomStart, simpleFuture.get().longValue()); + } + + + @Test + public void testException() throws Exception { + final SimpleFuture simpleFuture = new SimpleFuture<>(); + Thread t = new Thread() { + @Override + public void run() { + simpleFuture.fail(new Exception("hello")); + } + }; + t.start(); + + boolean failed = false; + try { + simpleFuture.get(); + } catch (Exception e) { + failed = true; + } + + + Assert.assertTrue(failed); + } + + + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index b6d5e622c2..43db1f7033 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -29,11 +29,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +47,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -160,6 +163,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // Compacting may replace this structure private final ConcurrentMap records = new ConcurrentHashMap<>(); + private final Set pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap()); + // Compacting may replace this structure private final ConcurrentMap transactions = new ConcurrentHashMap<>(); @@ -172,12 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private ExecutorService compactorExecutor = null; - private ConcurrentHashSet latches = new ConcurrentHashSet<>(); + private ExecutorService appendExecutor = null; - // Lock used during the append of records - // This lock doesn't represent a global lock. - // After a record is appended, the usedFile can't be changed until the positives and negatives are updated - private final Object lockAppend = new Object(); + private ConcurrentHashSet latches = new ConcurrentHashSet<>(); /** * We don't lock the journal during the whole compacting operation. During compacting we only @@ -688,32 +690,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); + pendingRecords.add(id); - journalLock.readLock().lock(); + Future result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); + records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); - try { - JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); - - if (logger.isTraceEnabled()) { - logger.trace("appendAddRecord::id=" + id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecord::id=" + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile); + } + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + } finally { + pendingRecords.remove(id); + journalLock.readLock().unlock(); } - - records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); } - } finally { - journalLock.readLock().unlock(); + }); + + if (sync && callback == null) { + result.get(); } } @@ -724,94 +731,86 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); + checkKnownRecordID(id); - journalLock.readLock().lock(); + Future result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalRecord jrnRecord = records.get(id); + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); - try { - JournalRecord jrnRecord = records.get(id); + if (logger.isTraceEnabled()) { + logger.trace("appendUpdateRecord::id=" + id + + ", userRecordType=" + + recordType + + ", usedFile = " + + usedFile); + } - if (jrnRecord == null) { - if (!(compactor != null && compactor.lookupRecord(id))) { - throw new IllegalStateException("Cannot find add info " + id); + // record==null here could only mean there is a compactor + // computing the delete should be done after compacting is done + if (jrnRecord == null) { + compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); + } else { + jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); + } + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + } finally { + journalLock.readLock().unlock(); } } + }); - JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); - - if (logger.isTraceEnabled()) { - logger.trace("appendUpdateRecord::id=" + id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); - } - - // record== null here could only mean there is a compactor, and computing the delete should be done after - // compacting is done - if (jrnRecord == null) { - compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize()); - } else { - jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize()); - } - } - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); } } @Override public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); + checkKnownRecordID(id); - journalLock.readLock().lock(); - try { + Future result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalRecord record = null; + if (compactor == null) { + record = records.remove(id); + } - JournalRecord record = null; + JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); + JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback); - if (compactor == null) { - record = records.remove(id); + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile); + } - if (record == null) { - throw new IllegalStateException("Cannot find add info " + id); - } - } else { - if (!records.containsKey(id) && !compactor.lookupRecord(id)) { - throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); + // record==null here could only mean there is a compactor + // computing the delete should be done after compacting is done + if (record == null) { + compactor.addCommandDelete(id, usedFile); + } else { + record.delete(usedFile); + } + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + } finally { + journalLock.readLock().unlock(); } } + }); - JournalInternalRecord deleteRecord = new JournalDeleteRecord(id); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback); - - if (logger.isTraceEnabled()) { - logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile); - } - - // record== null here could only mean there is a compactor, and computing the delete should be done after - // compacting is done - if (record == null) { - compactor.addCommandDelete(id, usedFile); - } else { - record.delete(usedFile); - } - - } - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); } } @@ -822,31 +821,62 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final EncodingSupport record) throws Exception { checkJournalIsLoaded(); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + appendExecutor.submit(new Runnable() { - JournalTransaction tx = getTransactionInfo(txID); + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); + if (logger.isTraceEnabled()) { + logger.trace("appendAddRecordTransactional:txID=" + txID + + ",id=" + + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendAddRecordTransactional:txID=" + txID + - ",id=" + - id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + tx.addPositive(usedFile, id, addRecord.getEncodeSize()); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.addPositive(usedFile, id, addRecord.getEncodeSize()); } - } finally { - journalLock.readLock().unlock(); + }); + } + + private void checkKnownRecordID(final long id) throws Exception { + if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.lookupRecord(id))) { + return; + } + + // retry on the append thread. maybe the appender thread is not keeping up. + Future known = appendExecutor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + journalLock.readLock().lock(); + try { + return records.containsKey(id) + || pendingRecords.contains(id) + || (compactor != null && compactor.lookupRecord(id)); + } finally { + journalLock.readLock().unlock(); + } + } + }); + + if (!known.get()) { + throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records"); } } @@ -867,32 +897,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final EncodingSupport record) throws Exception { checkJournalIsLoaded(); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); + appendExecutor.submit(new Runnable() { - JournalTransaction tx = getTransactionInfo(txID); + @Override + public void run() { + journalLock.readLock().lock(); + try { - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record ); + JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null ); - if (logger.isTraceEnabled()) { - logger.trace("appendUpdateRecordTransactional::txID=" + txID + - ",id=" + - id + - ", userRecordType=" + - recordType + - ", record = " + record + - ", usedFile = " + - usedFile); + if ( logger.isTraceEnabled() ) { + logger.trace( "appendUpdateRecordTransactional::txID=" + txID + + ",id=" + + id + + ", userRecordType=" + + recordType + + ", record = " + record + + ", usedFile = " + + usedFile ); + } + + tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() ); + } catch ( Exception e ) { + ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e ); + setErrorCondition( tx, e ); + } finally { + journalLock.readLock().unlock(); } - - tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize()); } - } finally { - journalLock.readLock().unlock(); - } + }); } @Override @@ -901,29 +938,35 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final EncodingSupport record) throws Exception { checkJournalIsLoaded(); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); + appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { - JournalTransaction tx = getTransactionInfo(txID); + JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record); + JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null); - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null); + if (logger.isTraceEnabled()) { + logger.trace("appendDeleteRecordTransactional::txID=" + txID + + ", id=" + + id + + ", usedFile = " + + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendDeleteRecordTransactional::txID=" + txID + - ", id=" + - id + - ", usedFile = " + - usedFile); + tx.addNegative(usedFile, id); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.addNegative(usedFile, id); } - } finally { - journalLock.readLock().unlock(); - } + }); } /** @@ -943,36 +986,53 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); - journalLock.readLock().lock(); + final JournalTransaction tx = getTransactionInfo(txID); + tx.checkErrorCondition(); - try { - JournalTransaction tx = getTransactionInfo(txID); + Future result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); + JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback); - JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData); + if (logger.isTraceEnabled()) { + logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile); + } - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback); - - if (logger.isTraceEnabled()) { - logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile); + tx.prepare(usedFile); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.prepare(usedFile); } + }); - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); + tx.checkErrorCondition(); } } @Override public void lineUpContext(IOCompletion callback) { - callback.storeLineUp(); + if (callback != null) { + callback.storeLineUp(); + } + } + + private void setErrorCondition(JournalTransaction jt, Throwable t) { + if (jt != null) { + TransactionCallback callback = jt.getCurrentCallback(); + if (callback != null && callback.getErrorMessage() != null) { + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage()); + } + } } /** @@ -982,68 +1042,83 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, - boolean lineUpContext) throws Exception { + final boolean lineUpContext) throws Exception { checkJournalIsLoaded(); + if (lineUpContext) { + lineUpContext(callback); + } - journalLock.readLock().lock(); + final JournalTransaction tx = transactions.remove(txID); - try { - JournalTransaction tx = transactions.remove(txID); + if (tx == null) { + throw new IllegalStateException("Cannot find tx with id " + txID); + } - if (tx == null) { - throw new IllegalStateException("Cannot find tx with id " + txID); - } + tx.checkErrorCondition(); - JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); + Future result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null); + JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback); - if (callback != null && lineUpContext) { - callback.storeLineUp(); - } - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback); + if (logger.isTraceEnabled()) { + logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile); + } - if (logger.isTraceEnabled()) { - logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile); + tx.commit(usedFile); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); } - - tx.commit(usedFile); } + }); - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); + tx.checkErrorCondition(); } } @Override public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); + lineUpContext(callback); - journalLock.readLock().lock(); + final JournalTransaction tx = transactions.remove(txID); - JournalTransaction tx = null; + if (tx == null) { + throw new IllegalStateException("Cannot find tx with id " + txID); + } - try { - tx = transactions.remove(txID); + tx.checkErrorCondition(); - if (tx == null) { - throw new IllegalStateException("Cannot find tx with id " + txID); + Future result = appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); + JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); + + tx.rollback(usedFile); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e); + setErrorCondition(tx, e); + } finally { + journalLock.readLock().unlock(); + } } + }); - JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID); - - if (callback != null) { - callback.storeLineUp(); - } - - synchronized (lockAppend) { - JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback); - - tx.rollback(usedFile); - } - - } finally { - journalLock.readLock().unlock(); + if (sync && callback == null) { + result.get(); + tx.checkErrorCondition(); } } @@ -1906,13 +1981,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void debugWait() throws InterruptedException { fileFactory.flush(); - for (JournalTransaction tx : transactions.values()) { - tx.waitCallbacks(); + if (appendExecutor != null && !appendExecutor.isShutdown()) { + // Send something to the closingExecutor, just to make sure we went until its end + final CountDownLatch latch = newLatch(1); + + appendExecutor.execute(new Runnable() { + + @Override + public void run() { + latch.countDown(); + } + + }); + awaitLatch(latch, -1); } if (filesExecutor != null && !filesExecutor.isShutdown()) { - // Send something to the closingExecutor, just to make sure we went - // until its end + // Send something to the closingExecutor, just to make sure we went until its end final CountDownLatch latch = newLatch(1); filesExecutor.execute(new Runnable() { @@ -1985,20 +2070,52 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // In some tests we need to force the journal to move to a next file @Override public void forceMoveNextFile() throws Exception { - journalLock.readLock().lock(); + debugWait(); + journalLock.writeLock().lock(); try { - synchronized (lockAppend) { - moveNextFile(false); - debugWait(); - } + moveNextFile(false); } finally { - journalLock.readLock().unlock(); + journalLock.writeLock().unlock(); } } @Override public void perfBlast(final int pages) { - new PerfBlast(pages).start(); + + checkJournalIsLoaded(); + + final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); + + final JournalInternalRecord blastRecord = new JournalInternalRecord() { + + @Override + public int getEncodeSize() { + return byteEncoder.getEncodeSize(); + } + + @Override + public void encode(final ActiveMQBuffer buffer) { + byteEncoder.encode(buffer); + } + }; + + appendExecutor.submit(new Runnable() { + @Override + public void run() { + journalLock.readLock().lock(); + try { + + for (int i = 0; i < pages; i++) { + appendRecord(blastRecord, false, false, null, null); + } + + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); + } finally { + journalLock.readLock().unlock(); + } + } + }); } // ActiveMQComponent implementation @@ -2031,6 +2148,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } }); + appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + + @Override + public Thread newThread(final Runnable r) { + return new Thread(r, "JournalImpl::appendExecutor"); + } + }); + filesRepository.setExecutor(filesExecutor); fileFactory.start(); @@ -2044,46 +2169,50 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal throw new IllegalStateException("Journal is already stopped"); } + setJournalState(JournalState.STOPPED); + + // appendExecutor must be shut down first + appendExecutor.shutdown(); + + if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor(); + } + journalLock.writeLock().lock(); try { - synchronized (lockAppend) { + compactorExecutor.shutdown(); - setJournalState(JournalState.STOPPED); - - compactorExecutor.shutdown(); - - if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopCompactor(); - } - - filesExecutor.shutdown(); - - filesRepository.setExecutor(null); - - if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor(); - } - - try { - for (CountDownLatch latch : latches) { - latch.countDown(); - } - } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); - } - - fileFactory.deactivateBuffer(); - - if (currentFile != null && currentFile.getFile().isOpen()) { - currentFile.getFile().close(); - } - - filesRepository.clear(); - - fileFactory.stop(); - - currentFile = null; + if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + ActiveMQJournalLogger.LOGGER.couldNotStopCompactor(); } + + filesExecutor.shutdown(); + + filesRepository.setExecutor(null); + + if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor(); + } + + try { + for (CountDownLatch latch : latches) { + latch.countDown(); + } + } catch (Throwable e) { + ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); + } + + fileFactory.deactivateBuffer(); + + if (currentFile != null && currentFile.getFile().isOpen()) { + currentFile.getFile().close(); + } + + filesRepository.clear(); + + fileFactory.stop(); + + currentFile = null; } finally { journalLock.writeLock().unlock(); } @@ -2358,7 +2487,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final boolean sync, final JournalTransaction tx, final IOCallback parameterCallback) throws Exception { - checkJournalIsLoaded(); final IOCallback callback; @@ -2552,46 +2680,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - private final class PerfBlast extends Thread { - - private final int pages; - - private PerfBlast(final int pages) { - super("activemq-perfblast-thread"); - - this.pages = pages; - } - - @Override - public void run() { - synchronized (lockAppend) { - try { - - final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); - - JournalInternalRecord blastRecord = new JournalInternalRecord() { - - @Override - public int getEncodeSize() { - return byteEncoder.getEncodeSize(); - } - - @Override - public void encode(final ActiveMQBuffer buffer) { - byteEncoder.encode(buffer); - } - }; - - for (int i = 0; i < pages; i++) { - appendRecord(blastRecord, false, false, null, null); - } - } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); - } - } - } - } - @Override public final void synchronizationLock() { compactorLock.writeLock().lock(); @@ -2624,7 +2712,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal long maxID = -1; for (long id : fileIds) { maxID = Math.max(maxID, id); - map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id)); + map.put(id, filesRepository.createRemoteBackupSyncFile(id)); } filesRepository.setNextFileID(maxID); return map; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java index 6e41c1799c..1542bd4b05 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java @@ -17,11 +17,13 @@ package org.apache.activemq.artemis.core.journal.impl; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -45,12 +47,14 @@ public class JournalTransaction { private boolean compacting = false; - private Map callbackList; + private final Map callbackList = Collections.synchronizedMap(new HashMap()); private JournalFile lastFile = null; private final AtomicInteger counter = new AtomicInteger(); + private CountDownLatch firstCallbackLatch; + public JournalTransaction(final long id, final JournalRecordProvider journal) { this.id = id; this.journal = journal; @@ -139,9 +143,7 @@ public class JournalTransaction { pendingFiles.clear(); } - if (callbackList != null) { - callbackList.clear(); - } + callbackList.clear(); if (pos != null) { pos.clear(); @@ -156,6 +158,8 @@ public class JournalTransaction { lastFile = null; currentCallback = null; + + firstCallbackLatch = null; } /** @@ -166,9 +170,13 @@ public class JournalTransaction { data.setNumberOfRecords(getCounter(currentFile)); } + public TransactionCallback getCurrentCallback() { + return currentCallback; + } + public TransactionCallback getCallback(final JournalFile file) throws Exception { - if (callbackList == null) { - callbackList = new HashMap<>(); + if (firstCallbackLatch != null && callbackList.isEmpty()) { + firstCallbackLatch.countDown(); } currentCallback = callbackList.get(file); @@ -178,15 +186,19 @@ public class JournalTransaction { callbackList.put(file, currentCallback); } - if (currentCallback.getErrorMessage() != null) { - throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage()); - } - currentCallback.countUp(); return currentCallback; } + public void checkErrorCondition() throws Exception { + if (currentCallback != null) { + if (currentCallback.getErrorMessage() != null) { + throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage()); + } + } + } + public void addPositive(final JournalFile file, final long id, final int size) { incCounter(file); @@ -264,7 +276,8 @@ public class JournalTransaction { } public void waitCallbacks() throws InterruptedException { - if (callbackList != null) { + waitFirstCallback(); + synchronized (callbackList) { for (TransactionCallback callback : callbackList.values()) { callback.waitCompletion(); } @@ -275,8 +288,15 @@ public class JournalTransaction { * Wait completion at the latest file only */ public void waitCompletion() throws Exception { - if (currentCallback != null) { - currentCallback.waitCompletion(); + waitFirstCallback(); + currentCallback.waitCompletion(); + } + + private void waitFirstCallback() throws InterruptedException { + if (currentCallback == null) { + firstCallbackLatch = new CountDownLatch(1); + firstCallbackLatch.await(); + firstCallbackLatch = null; } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java index 198185c1ae..6758c64283 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java @@ -143,7 +143,7 @@ public interface ActiveMQJournalLogger extends BasicLogger { void compactReadError(JournalFile file); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting", + @Message(id = 142012, value = "Couldn't find tx={0} to merge after compacting", format = Message.Format.MESSAGE_FORMAT) void compactMergeError(Long id); @@ -163,12 +163,12 @@ public interface ActiveMQJournalLogger extends BasicLogger { void uncomittedTxFound(Long id); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds", + @Message(id = 142016, value = "Could not stop compactor executor after 120 seconds", format = Message.Format.MESSAGE_FORMAT) void couldNotStopCompactor(); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds", + @Message(id = 142017, value = "Could not stop journal executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT) void couldNotStopJournalExecutor(); @@ -182,7 +182,7 @@ public interface ActiveMQJournalLogger extends BasicLogger { void deletingOrphanedFile(String fileToDelete); @LogMessage(level = Logger.Level.WARN) - @Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 142020, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT) void errorClosingFile(String fileToDelete); @LogMessage(level = Logger.Level.WARN) @@ -241,6 +241,10 @@ public interface ActiveMQJournalLogger extends BasicLogger { @Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT) void errorSubmittingWrite(@Cause Throwable e); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 142035, value = "Could not stop journal append executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT) + void couldNotStopJournalAppendExecutor(); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT) void errorDeletingFile(Object e); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java index 080db78bfa..5e27b36800 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java @@ -532,6 +532,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.appendCommitRecord(1L, false); + journalImpl.debugWait(); + System.out.println("Files = " + factory.listFiles("tt")); SequentialFile file = factory.createSequentialFile("tt-1.tt"); @@ -598,6 +600,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.appendCommitRecord(2L, false); + journalImpl.debugWait(); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -697,6 +701,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.appendCommitRecord(1L, false); + journalImpl.debugWait(); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -936,8 +942,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { journalImpl.forceMoveNextFile(); - // Reclaiming should still be able to reclaim a file if a transaction was - // ignored + // Reclaiming should still be able to reclaim a file if a transaction was ignored journalImpl.checkReclaimStatus(); Assert.assertEquals(2, factory.listFiles("tt").size()); @@ -1109,7 +1114,16 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { } @Test - public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception { + public void testReclaimingAfterConcurrentAddsAndDeletesTx() throws Exception { + testReclaimingAfterConcurrentAddsAndDeletes(true); + } + + @Test + public void testReclaimingAfterConcurrentAddsAndDeletesNonTx() throws Exception { + testReclaimingAfterConcurrentAddsAndDeletes(false); + } + + public void testReclaimingAfterConcurrentAddsAndDeletes(final boolean transactional) throws Exception { final int JOURNAL_SIZE = 10 * 1024; setupAndLoadJournal(JOURNAL_SIZE, 1); @@ -1131,8 +1145,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { latchReady.countDown(); ActiveMQTestBase.waitForLatch(latchStart); for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) { - journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1)); - journalImpl.appendCommitRecord(i, false); + + if (transactional) { + journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1)); + journalImpl.appendCommitRecord(i, false); + } else { + journalImpl.appendAddRecord(i, (byte) 1, new SimpleEncoding(50, (byte) 1), false); + } + queueDelete.offer(i); } finishedOK.incrementAndGet(); @@ -1153,7 +1173,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase { if (toDelete == null) { break; } - journalImpl.appendDeleteRecord(toDelete, false); + + if (transactional) { + journalImpl.appendDeleteRecordTransactional(toDelete, toDelete, new SimpleEncoding(50, (byte) 1)); + journalImpl.appendCommitRecord(i, false); + } else { + journalImpl.appendDeleteRecord(toDelete, false); + } + } finishedOK.incrementAndGet(); } catch (Exception e) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java index 41058c685b..204600e2f2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java @@ -81,6 +81,8 @@ public class JournalAsyncTest extends ActiveMQTestBase { journalImpl.appendAddRecordTransactional(1L, i, (byte) 1, new SimpleEncoding(1, (byte) 0)); } + journalImpl.debugWait(); + latch.countDown(); factory.setHoldCallbacks(false, null); if (isCommit) { @@ -115,8 +117,7 @@ public class JournalAsyncTest extends ActiveMQTestBase { } } - // If a callback error already arrived, we should just throw the exception - // right away + // If a callback error already arrived, we should just throw the exception right away @Test public void testPreviousError() throws Exception { final int JOURNAL_SIZE = 20000; @@ -128,6 +129,8 @@ public class JournalAsyncTest extends ActiveMQTestBase { journalImpl.appendAddRecordTransactional(1L, 1, (byte) 1, new SimpleEncoding(1, (byte) 0)); + journalImpl.debugWait(); + factory.flushAllCallbacks(); factory.setGenerateErrors(false); @@ -135,11 +138,11 @@ public class JournalAsyncTest extends ActiveMQTestBase { try { journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0)); - Assert.fail("Exception expected"); // An exception already happened in one - // of the elements on this transaction. - // We can't accept any more elements on - // the transaction + Assert.fail("Exception expected"); + // An exception already happened in one of the elements on this transaction. + // We can't accept any more elements on the transaction } catch (Exception ignored) { + } }