ARTEMIS-3084 Eliminate Block on moving to next file on libaio

This commit is contained in:
Clebert Suconic 2021-01-27 17:40:10 -05:00
parent 2cf8d5c181
commit c019218c4e
20 changed files with 486 additions and 199 deletions

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public abstract class AbstractLatch {
/**
* Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
*
* @see AbstractQueuedSynchronizer
*/
@SuppressWarnings("serial")
protected static class CountSync extends AbstractQueuedSynchronizer {
private CountSync(int count) {
setState(count);
}
public int getCount() {
return getState();
}
public void setCount(final int count) {
setState(count);
}
@Override
public int tryAcquireShared(final int numberOfAqcquires) {
return getState() == 0 ? 1 : -1;
}
public void add() {
for (;;) {
int actualState = getState();
int newState = actualState + 1;
if (compareAndSetState(actualState, newState)) {
return;
}
}
}
@Override
public boolean tryReleaseShared(final int numberOfReleases) {
for (;;) {
int actualState = getState();
if (actualState == 0) {
return true;
}
int newState = actualState - numberOfReleases;
if (newState < 0) {
newState = 0;
}
if (compareAndSetState(actualState, newState)) {
return newState == 0;
}
}
}
}
protected final CountSync control;
public AbstractLatch() {
this(0);
}
public AbstractLatch(final int count) {
control = new CountSync(count);
}
public int getCount() {
return control.getCount();
}
public void setCount(final int count) {
control.setCount(count);
}
public void countUp() {
control.add();
}
public abstract void countDown();
public abstract void countDown(int count);
public void await() throws InterruptedException {
control.acquireSharedInterruptibly(1);
}
public boolean await(final long milliseconds) throws InterruptedException {
return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
}
public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException {
return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
/** An automatic latch has the same semantic as the ReusableLatch
* However this class has a replaceable callback that could be called
* when the number of elements reach zero.
* With that you can either block to wait completion, or to send a callback to be
* used when it reaches 0. */
public class AutomaticLatch extends AbstractLatch {
volatile Runnable afterCompletion;
public AutomaticLatch() {
}
public AutomaticLatch(int count) {
super(count);
}
// it will execute when the counter reaches 0.
// notice that since the latch is reusable,
// the runnable will be cleared once it reached 0
public void afterCompletion(final Runnable newRun) {
// We first raise one element up
// to avoid a race on it being called while another thread sets it down to 0
countUp();
if (this.afterCompletion != null) {
// this should not happen really,
// but just in case it ever happens,
// I would rather have a runnable depending into other runnables instead of a collection here
// as the use case I'm after is a single runnable
final Runnable oldRun = afterCompletion;
this.afterCompletion = () -> {
oldRun.run();
newRun.run();
};
} else {
this.afterCompletion = newRun;
}
// then we countDown so it will be instantly 0 if nothing else done it
// or it then just keep flow as usual
countDown();
}
@Override
public final void countDown() {
if (control.releaseShared(1)) {
doRun();
}
}
private void doRun() {
Runnable toRun = afterCompletion;
afterCompletion = null;
if (toRun != null) {
toRun.run();
}
}
@Override
public final void countDown(final int count) {
if (control.releaseShared(count)) {
doRun();
}
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
* <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
@ -33,103 +30,23 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
*
* <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
*/
public class ReusableLatch {
/**
* Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
*
* @see AbstractQueuedSynchronizer
*/
@SuppressWarnings("serial")
private static class CountSync extends AbstractQueuedSynchronizer {
private CountSync(int count) {
setState(count);
}
public int getCount() {
return getState();
}
public void setCount(final int count) {
setState(count);
}
@Override
public int tryAcquireShared(final int numberOfAqcquires) {
return getState() == 0 ? 1 : -1;
}
public void add() {
for (;;) {
int actualState = getState();
int newState = actualState + 1;
if (compareAndSetState(actualState, newState)) {
return;
}
}
}
@Override
public boolean tryReleaseShared(final int numberOfReleases) {
for (;;) {
int actualState = getState();
if (actualState == 0) {
return true;
}
int newState = actualState - numberOfReleases;
if (newState < 0) {
newState = 0;
}
if (compareAndSetState(actualState, newState)) {
return newState == 0;
}
}
}
}
private final CountSync control;
public class ReusableLatch extends AbstractLatch {
public ReusableLatch() {
this(0);
super();
}
public ReusableLatch(final int count) {
control = new CountSync(count);
}
public int getCount() {
return control.getCount();
}
public void setCount(final int count) {
control.setCount(count);
}
public void countUp() {
control.add();
public ReusableLatch(int count) {
super(count);
}
@Override
public void countDown() {
control.releaseShared(1);
}
@Override
public void countDown(final int count) {
control.releaseShared(count);
}
public void await() throws InterruptedException {
control.acquireSharedInterruptibly(1);
}
public boolean await(final long milliseconds) throws InterruptedException {
return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
}
public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException {
return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
}
}

View File

@ -302,11 +302,11 @@ public class JDBCSequentialFile implements SequentialFile {
@Override
public void close() throws Exception {
close(true);
close(true, true);
}
@Override
public void close(boolean waitOnSync) throws Exception {
public void close(boolean waitOnSync, boolean block) throws Exception {
isOpen.set(false);
if (waitOnSync) {
sync();

View File

@ -44,7 +44,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
private static final Logger logger = Logger.getLogger(AbstractSequentialFile.class);
private File file;
protected File file;
protected final File directory;
@ -96,7 +96,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
public final void delete() throws IOException, InterruptedException, ActiveMQException {
try {
if (isOpen()) {
close(false);
close(false, false);
}
Files.deleteIfExists(file.toPath());
} catch (Throwable t) {

View File

@ -31,12 +31,24 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
public interface SequentialFile {
default boolean isPending() {
return false;
}
default void waitNotPending() {
return;
}
boolean isOpen();
boolean exists();
void open() throws Exception;
default void afterComplete(Runnable run) {
run.run();
}
/**
* The maximum number of simultaneous writes accepted
*
@ -65,6 +77,14 @@ public interface SequentialFile {
void write(EncodingSupport bytes, boolean sync) throws Exception;
default void refUp() {
}
default void refDown() {
}
/**
* Write directly to the file without using any buffer
*
@ -119,7 +139,7 @@ public interface SequentialFile {
/** When closing a file from a finalize block, you cant wait on syncs or anything like that.
* otherwise the VM may hung. Especially on the testsuite. */
default void close(boolean waitSync) throws Exception {
default void close(boolean waitSync, boolean blockOnWait) throws Exception {
// by default most implementations are just using the regular close..
// if the close needs sync, please use this parameter or fianlizations may get stuck
close();

View File

@ -18,12 +18,9 @@ package org.apache.activemq.artemis.core.io.aio;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.nio.ByteBuffer;
import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -35,20 +32,23 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.AutomaticLatch;
import org.jboss.logging.Logger;
public class AIOSequentialFile extends AbstractSequentialFile {
/** This class is implementing Runnable to reuse a callback to close it. */
public class AIOSequentialFile extends AbstractSequentialFile {
private static final Logger logger = Logger.getLogger(AIOSequentialFileFactory.class);
private boolean opened = false;
private volatile boolean pendingClose = false;
private LibaioFile aioFile;
private final AIOSequentialFileFactory aioFactory;
private final ReusableLatch pendingCallbacks = new ReusableLatch();
private final AutomaticLatch pendingCallbacks = new AutomaticLatch();
/**
* Used to determine the next writing sequence
@ -78,6 +78,16 @@ public class AIOSequentialFile extends AbstractSequentialFile {
this.aioFactory = factory;
}
@Override
public void refUp() {
pendingCallbacks.countUp();
}
@Override
public void refDown() {
pendingCallbacks.countDown();
}
@Override
public ByteBuffer map(int position, long size) throws IOException {
return null;
@ -101,43 +111,57 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public void close() throws IOException, InterruptedException, ActiveMQException {
close(true);
close(true, false);
}
private void actualClose() {
try {
aioFile.close();
} catch (IOException e) {
factory.onIOError(e, e.getMessage(), this);
} finally {
aioFile = null;
pendingClose = false;
aioFactory.afterClose();
}
}
@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
public boolean isPending() {
return pendingClose;
}
@Override
public void waitNotPending() {
try {
pendingCallbacks.await();
} catch (InterruptedException e) {
// nothing to be done here, other than log it and forward it
logger.warn(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
@Override
public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException {
if (!opened) {
return;
}
aioFactory.beforeClose();
super.close();
try {
if (waitSync) {
final String fileName = this.getFileName();
try {
int waitCount = 0;
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
waitCount++;
if (waitCount == 1) {
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threads) {
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
}
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
}
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
}
} catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
}
opened = false;
pendingClose = true;
this.timedBuffer = null;
if (waitSync) {
pendingCallbacks.afterCompletion(this::actualClose);
if (blockOnWait) {
pendingCallbacks.await();
}
} finally {
opened = false;
timedBuffer = null;
aioFile.close();
aioFile = null;
} else {
actualClose();
}
}
@ -160,6 +184,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
if (opened) {
return;
}
opened = true;
if (logger.isTraceEnabled()) {
@ -334,7 +361,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public String toString() {
return "AIOSequentialFile:" + getFile().getAbsolutePath();
return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + ", pendingClose=" + pendingClose + ", pendingCallbacks=" + pendingCallbacks + '}';
}
// Private methods
@ -342,7 +369,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
private void checkOpened() {
if (aioFile == null || !opened) {
throw new NullPointerException("File not opened, file=null");
throw new NullPointerException("File not opened, file=null on fileName = " + getFileName());
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.util.internal.PlatformDependent;
@ -38,6 +39,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
import org.jctools.queues.MpmcArrayQueue;
@ -60,6 +62,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
}
}
private final ReusableLatch pendingClose = new ReusableLatch(0);
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
private volatile boolean reuseBuffers = true;
@ -74,6 +78,14 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private static final String AIO_TEST_FILE = ".aio-test";
public void beforeClose() {
pendingClose.countUp();
}
public void afterClose() {
pendingClose.countDown();
}
public AIOSequentialFileFactory(final File journalDir, int maxIO) {
this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null, null);
}
@ -287,6 +299,15 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (this.running.compareAndSet(true, false)) {
buffersControl.stop();
try {
// if we stop libaioContext before we finish this, we will never get confirmation on items previously sent
if (!pendingClose.await(1, TimeUnit.MINUTES)) {
ActiveMQJournalLogger.LOGGER.warn("Timeout on waiting for asynchronous close");
}
} catch (Throwable throwableToLog) {
logger.warn(throwableToLog.getMessage(), throwableToLog);
}
libaioContext.close();
libaioContext = null;

View File

@ -142,7 +142,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void delete() {
close(false);
close(false, false);
if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
}
@ -355,11 +355,11 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void close() {
close(true);
close(true, true);
}
@Override
public void close(boolean waitOnSync) {
public void close(boolean waitOnSync, boolean block) {
if (this.mappedFile != null) {
if (waitOnSync && factory.isDatasync())
this.mappedFile.force();

View File

@ -196,11 +196,11 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
close(true);
close(true, true);
}
@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
public synchronized void close(boolean waitSync, boolean blockOnPending) throws IOException, InterruptedException, ActiveMQException {
super.close();
if (DEBUG_OPENS) {

View File

@ -149,7 +149,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
return controlFile;
} finally {
controlFile.close();
controlFile.close(false, false);
}
}
@ -228,7 +228,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
byteBuffer.clear().position(readerIndex).limit(readerIndex + writingChannel.readableBytes());
sequentialFile.blockingWriteDirect(byteBuffer, true, false);
} finally {
sequentialFile.close();
sequentialFile.close(false, false);
newDataFiles.add(currentFile);
}
}

View File

@ -87,7 +87,7 @@ public class JournalFileImpl implements JournalFile {
@Override
public boolean isCanReclaim() {
return posReclaimCriteria && negReclaimCriteria;
return posReclaimCriteria && negReclaimCriteria && !file.isPending();
}
@Override

View File

@ -151,8 +151,17 @@ public class JournalFilesRepository {
}
public void clear() throws Exception {
for (JournalFile file : dataFiles) {
file.getFile().waitNotPending();
}
dataFiles.clear();
for (JournalFile file : freeFiles) {
file.getFile().waitNotPending();
}
freeFiles.clear();
freeFilesCount.set(0);
@ -410,7 +419,7 @@ public class JournalFilesRepository {
private void damagedFile(JournalFile file) throws Exception {
if (file.getFile().isOpen()) {
file.getFile().close(false);
file.getFile().close(false, false);
}
if (file.getFile().exists()) {
final Path journalPath = file.getFile().getJavaFile().toPath();
@ -529,9 +538,9 @@ public class JournalFilesRepository {
}
}
public void closeFile(final JournalFile file) throws Exception {
public void closeFile(final JournalFile file, boolean block) throws Exception {
fileFactory.deactivateBuffer();
file.getFile().close();
file.getFile().close(true, block);
if (!dataFiles.contains(file)) {
// This is not a retry from openFile
// If you don't check this then retries keep adding the same file into
@ -659,7 +668,7 @@ public class JournalFilesRepository {
long position = sequentialFile.position();
sequentialFile.close();
sequentialFile.close(false, false);
if (logger.isTraceEnabled()) {
logger.trace("Renaming file " + tmpFileName + " as " + fileName);
@ -724,7 +733,7 @@ public class JournalFilesRepository {
sf.position(position);
sf.close();
sf.close(false, false);
return jf;
}

View File

@ -796,7 +796,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
try {
file.getFile().close(false);
file.getFile().close(false, false);
} catch (Throwable ignored) {
}
}
@ -1671,7 +1671,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
compactorLock.writeLock().lock();
try {
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount());
ArrayList<JournalFile> dataFilesToProcess;
boolean previousReclaimValue = isAutoReclaim();
@ -1682,45 +1682,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
journalLock.writeLock().lock();
try {
if (state != JournalState.LOADED) {
return;
}
dataFilesToProcess = getDataListToProcess();
onCompactLockingTheJournal();
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
moveNextFile(false);
// Take the snapshots and replace the structures
dataFilesToProcess.addAll(filesRepository.getDataFiles());
filesRepository.clearDataFiles();
if (dataFilesToProcess.size() == 0) {
logger.trace("Finishing compacting, nothing to process");
return;
}
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
transactions.forEach((id, pendingTransaction) -> {
compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray());
pendingTransaction.setCompacting();
});
// We will calculate the new records during compacting, what will take the position the records will take
// after compacting
records.clear();
} finally {
journalLock.writeLock().unlock();
}
if (dataFilesToProcess == null)
return;
Collections.sort(dataFilesToProcess, JOURNAL_FILE_COMPARATOR);
@ -1849,6 +1814,57 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
/** this private method will return a list of data files that need to be cleaned up.
* It will get the list, and replace it on the journal structure, while a separate thread would be able
* to read it, and append to a new list that will be replaced on the journal. */
private ArrayList<JournalFile> getDataListToProcess() throws Exception {
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount());
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
journalLock.writeLock().lock();
try {
if (state != JournalState.LOADED) {
return null;
}
onCompactLockingTheJournal();
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
moveNextFile(false, true);
// Take the snapshots and replace the structures
dataFilesToProcess.addAll(filesRepository.getDataFiles());
filesRepository.clearDataFiles();
if (dataFilesToProcess.size() == 0) {
logger.trace("Finishing compacting, nothing to process");
return null;
}
compactor = new JournalCompactor(fileFactory, this, filesRepository, records.keysLongHashSet(), dataFilesToProcess.get(0).getFileID());
transactions.forEach((id, pendingTransaction) -> {
compactor.addPendingTransaction(id, pendingTransaction.getPositiveArray());
pendingTransaction.setCompacting();
});
// We will calculate the new records during compacting, what will take the position the records will take
// after compacting
records.clear();
} finally {
journalLock.writeLock().unlock();
}
for (JournalFile file : dataFilesToProcess) {
file.getFile().waitNotPending();
}
return dataFilesToProcess;
}
/**
* <p>Load data accordingly to the record layouts</p>
* <p>Basic record layout:</p>
@ -2505,7 +2521,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
debugWait();
journalLock.writeLock().lock();
try {
moveNextFile(false);
moveNextFile(false, true);
} finally {
journalLock.writeLock().unlock();
}
@ -2586,7 +2602,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
fileFactory.deactivateBuffer();
if (currentFile != null && currentFile.getFile().isOpen()) {
currentFile.getFile().close();
currentFile.getFile().close(true, true);
}
filesRepository.clear();
@ -3155,7 +3171,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
try {
if (!currentFile.getFile().fits(size)) {
moveNextFile(true);
moveNextFile(true, false);
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size)) {
@ -3215,8 +3231,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
/**
* You need to guarantee lock.acquire() before calling this method!
*/
protected void moveNextFile(final boolean scheduleReclaim) throws Exception {
filesRepository.closeFile(currentFile);
protected void moveNextFile(final boolean scheduleReclaim, boolean blockOnClose) throws Exception {
filesRepository.closeFile(currentFile, blockOnClose);
currentFile = filesRepository.openFile();

View File

@ -508,7 +508,7 @@ public final class Page implements Comparable<Page> {
// leave it to the soft cache to decide when to release it now
pageCache = null;
}
file.close(waitSync);
file.close(waitSync, waitSync);
Set<PageSubscriptionCounter> counters = getPendingCounters();
if (counters != null) {

View File

@ -592,7 +592,7 @@ public class PagingStoreImpl implements PagingStore {
file.position(0);
file.close(false);
file.close(false, false);
return page;
}

View File

@ -201,7 +201,7 @@ public class LargeBody {
} else {
SequentialFile tmpFile = createFile();
bodySize = tmpFile.size();
tmpFile.close(false);
tmpFile.close(false, false);
}
}
return bodySize;
@ -274,7 +274,7 @@ public class LargeBody {
throw new RuntimeException(e);
} finally {
try {
file.close(false);
file.close(false, false);
} catch (Exception ignored) {
}
}
@ -296,7 +296,7 @@ public class LargeBody {
} finally {
if (closeFile) {
try {
file.close(false);
file.close(false, false);
} catch (Exception ignored) {
}
}
@ -309,7 +309,7 @@ public class LargeBody {
if (sync) {
file.sync();
}
file.close(false);
file.close(false, false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
}
@ -388,7 +388,7 @@ public class LargeBody {
public void open() throws ActiveMQException {
try {
if (cFile != null && cFile.isOpen()) {
cFile.close(false);
cFile.close(false, false);
}
cFile = getReadingFile();
cFile.open();
@ -415,7 +415,7 @@ public class LargeBody {
public void close() throws ActiveMQException {
try {
if (cFile != null) {
cFile.close(false);
cFile.close(false, false);
cFile = null;
}
} catch (Exception e) {

View File

@ -99,8 +99,8 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
int fileSize) {
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
@Override
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
super.moveNextFile(scheduleReclaim);
protected void moveNextFile(boolean scheduleReclaim, boolean block) throws Exception {
super.moveNextFile(scheduleReclaim, block);
if (blocked.get()) {
throw new IllegalStateException("forcibly down");
}

View File

@ -349,8 +349,8 @@ public class SharedNothingReplicationFlowControlTest extends ActiveMQTestBase {
}
@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
super.close(waitSync);
public synchronized void close(boolean waitSync, boolean block) throws IOException, InterruptedException, ActiveMQException {
super.close(waitSync, block);
openFiles.remove(TestableSequentialFile.this);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.util;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.AutomaticLatch;
import org.junit.Assert;
import org.junit.Test;
public class AutomaticLatchTest {
@Test
public void testWthPending() {
AtomicInteger value = new AtomicInteger(0);
AutomaticLatch latch = new AutomaticLatch(1);
latch.afterCompletion(() -> value.incrementAndGet());
Assert.assertEquals(0, value.get());
latch.countDown();
Assert.assertEquals(1, value.get());
}
@Test
public void testWthoutPending() {
AtomicInteger value = new AtomicInteger(0);
AutomaticLatch latch = new AutomaticLatch(0);
latch.afterCompletion(() -> value.incrementAndGet());
Assert.assertEquals(1, value.get());
latch.countUp();
latch.countDown();
// the previous latch completion should been cleared by now
Assert.assertEquals(1, value.get());
latch.afterCompletion(() -> value.addAndGet(10));
Assert.assertEquals(11, value.get());
latch.countUp();
latch.countDown();
Assert.assertEquals(11, value.get());
}
@Test
public void testMultipleCallsOrder() {
ArrayList<Integer> outcome = new ArrayList<>();
AutomaticLatch latch = new AutomaticLatch(1);
latch.afterCompletion(() -> outcome.add(0));
latch.afterCompletion(() -> outcome.add(1));
latch.afterCompletion(() -> outcome.add(2));
latch.countDown();
Assert.assertEquals((Integer)0, outcome.get(0));
Assert.assertEquals((Integer)1, outcome.get(1));
Assert.assertEquals((Integer)2, outcome.get(2));
Assert.assertEquals(3, outcome.size());
}
}