diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java index 844f9f09a3..9bfa3e6345 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java @@ -25,6 +25,10 @@ public interface CriticalAnalyzer extends ActiveMQComponent { default void clear() { } + default int getNumberOfComponents() { + return 0; + } + boolean isMeasuring(); void add(CriticalComponent component); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java index 6a9a0dd3c9..b7676491b3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java @@ -66,6 +66,11 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { private final ConcurrentHashSet components = new ConcurrentHashSet<>(); + @Override + public int getNumberOfComponents() { + return components.size(); + } + @Override public boolean isMeasuring() { return true; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index b9ea6a830f..1d7a017085 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -63,6 +63,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac protected final IOCriticalErrorListener critialErrorListener; + protected final CriticalAnalyzer criticalAnalyzer; + /** * Asynchronous writes need to be done at another executor. * This needs to be done at NIO, or else we would have the callers thread blocking for the return. @@ -84,6 +86,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac criticalAnalyzer = EmptyCriticalAnalyzer.getInstance(); } + this.criticalAnalyzer = criticalAnalyzer; + if (buffered && bufferTimeout > 0) { timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, bufferTimeout, logRates); criticalAnalyzer.add(timedBuffer); @@ -96,6 +100,11 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac this.maxIO = maxIO; } + @Override + public CriticalAnalyzer getCriticalAnalyzer() { + return criticalAnalyzer; + } + @Override public long getBufferSize() { return bufferSize; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index f40a6c408a..a724ad342f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -20,11 +20,17 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.List; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; + /** * A SequentialFileFactory */ public interface SequentialFileFactory { + default CriticalAnalyzer getCriticalAnalyzer() { + return null; + } + SequentialFile createSequentialFile(String fileName); int getMaxIO(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index e0fe149eb7..41137d9d0f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -38,8 +38,13 @@ import org.jboss.logging.Logger; public final class TimedBuffer extends CriticalComponentImpl { - protected static final int CRITICAL_PATHS = 1; + protected static final int CRITICAL_PATHS = 6; protected static final int CRITICAL_PATH_FLUSH = 0; + protected static final int CRITICAL_PATH_STOP = 1; + protected static final int CRITICAL_PATH_START = 2; + protected static final int CRITICAL_PATH_CHECK_SIZE = 3; + protected static final int CRITICAL_PATH_ADD_BYTES = 4; + protected static final int CRITICAL_PATH_SET_OBSERVER = 5; private static final Logger logger = Logger.getLogger(TimedBuffer.class); @@ -120,7 +125,6 @@ public final class TimedBuffer extends CriticalComponentImpl { //direct ByteBuffers with no Cleaner! buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size))); - buffer.clear(); bufferLimit = 0; @@ -130,67 +134,91 @@ public final class TimedBuffer extends CriticalComponentImpl { this.timeout = timeout; } - public synchronized void start() { - if (started) { - return; - } - - // Need to start with the spin limiter acquired + public void start() { + enterCritical(CRITICAL_PATH_START); try { - spinLimiter.acquire(); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); + synchronized (this) { + if (started) { + return; + } + + // Need to start with the spin limiter acquired + try { + spinLimiter.acquire(); + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + + timerRunnable = new CheckTimer(); + + timerThread = new Thread(timerRunnable, "activemq-buffer-timeout"); + + timerThread.start(); + + if (logRates) { + logRatesTimerTask = new LogRatesTimerTask(); + + logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000); + } + + started = true; + } + } finally { + leaveCritical(CRITICAL_PATH_START); } - - timerRunnable = new CheckTimer(); - - timerThread = new Thread(timerRunnable, "activemq-buffer-timeout"); - - timerThread.start(); - - if (logRates) { - logRatesTimerTask = new LogRatesTimerTask(); - - logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000); - } - - started = true; } public void stop() { - if (!started) { - return; - } + enterCritical(CRITICAL_PATH_STOP); + try { + // add critical analyzer here.... <<<< + synchronized (this) { + try { + if (!started) { + return; + } - flush(); + flush(); - bufferObserver = null; + bufferObserver = null; - timerRunnable.close(); + timerRunnable.close(); - spinLimiter.release(); + spinLimiter.release(); - if (logRates) { - logRatesTimerTask.cancel(); - } + if (logRates) { + logRatesTimerTask.cancel(); + } - while (timerThread.isAlive()) { - try { - timerThread.join(); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); + while (timerThread.isAlive()) { + try { + timerThread.join(); + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } + } finally { + started = false; + } } + } finally { + leaveCritical(CRITICAL_PATH_STOP); } - - started = false; } - public synchronized void setObserver(final TimedBufferObserver observer) { - if (bufferObserver != null) { - flush(); - } + public void setObserver(final TimedBufferObserver observer) { + enterCritical(CRITICAL_PATH_SET_OBSERVER); + try { + synchronized (this) { + if (bufferObserver != null) { + flush(); + } - bufferObserver = observer; + bufferObserver = observer; + } + } finally { + leaveCritical(CRITICAL_PATH_SET_OBSERVER); + } } /** @@ -198,81 +226,101 @@ public final class TimedBuffer extends CriticalComponentImpl { * * @param sizeChecked */ - public synchronized boolean checkSize(final int sizeChecked) { - if (!started) { - throw new IllegalStateException("TimedBuffer is not started"); - } + public boolean checkSize(final int sizeChecked) { + enterCritical(CRITICAL_PATH_CHECK_SIZE); + try { + synchronized (this) { + if (!started) { + throw new IllegalStateException("TimedBuffer is not started"); + } - if (sizeChecked > bufferSize) { - throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + - ") on the journal"); - } + if (sizeChecked > bufferSize) { + throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize + ") on the journal"); + } - if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) { - // Either there is not enough space left in the buffer for the sized record - // Or a flush has just been performed and we need to re-calculate bufferLimit + if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit) { + // Either there is not enough space left in the buffer for the sized record + // Or a flush has just been performed and we need to re-calculate bufferLimit - flush(); + flush(); - delayFlush = true; + delayFlush = true; - final int remainingInFile = bufferObserver.getRemainingBytes(); + final int remainingInFile = bufferObserver.getRemainingBytes(); - if (sizeChecked > remainingInFile) { - return false; - } else { - // There is enough space in the file for this size + if (sizeChecked > remainingInFile) { + return false; + } else { + // There is enough space in the file for this size - // Need to re-calculate buffer limit + // Need to re-calculate buffer limit - bufferLimit = Math.min(remainingInFile, bufferSize); + bufferLimit = Math.min(remainingInFile, bufferSize); - return true; + return true; + } + } else { + delayFlush = true; + + return true; + } } - } else { - delayFlush = true; - - return true; + } finally { + leaveCritical(CRITICAL_PATH_CHECK_SIZE); } } - public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) { - if (!started) { - throw new IllegalStateException("TimedBuffer is not started"); - } + public void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) { + enterCritical(CRITICAL_PATH_ADD_BYTES); + try { + synchronized (this) { + if (!started) { + throw new IllegalStateException("TimedBuffer is not started"); + } - delayFlush = false; + delayFlush = false; - //it doesn't modify the reader index of bytes as in the original version - final int readableBytes = bytes.readableBytes(); - final int writerIndex = buffer.writerIndex(); - buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes); - buffer.writerIndex(writerIndex + readableBytes); + //it doesn't modify the reader index of bytes as in the original version + final int readableBytes = bytes.readableBytes(); + final int writerIndex = buffer.writerIndex(); + buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes); + buffer.writerIndex(writerIndex + readableBytes); - callbacks.add(callback); + callbacks.add(callback); - if (sync) { - pendingSync = true; + if (sync) { + pendingSync = true; - startSpin(); + startSpin(); + } + } + } finally { + leaveCritical(CRITICAL_PATH_ADD_BYTES); } } - public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) { - if (!started) { - throw new IllegalStateException("TimedBuffer is not started"); - } + public void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) { + enterCritical(CRITICAL_PATH_ADD_BYTES); + try { + synchronized (this) { + if (!started) { + throw new IllegalStateException("TimedBuffer is not started"); + } - delayFlush = false; + delayFlush = false; - bytes.encode(buffer); + bytes.encode(buffer); - callbacks.add(callback); + callbacks.add(callback); - if (sync) { - pendingSync = true; + if (sync) { + pendingSync = true; - startSpin(); + startSpin(); + } + } + } finally { + leaveCritical(CRITICAL_PATH_ADD_BYTES); } } @@ -287,13 +335,13 @@ public final class TimedBuffer extends CriticalComponentImpl { * @return {@code true} when are flushed any bytes, {@code false} otherwise */ public boolean flushBatch() { - synchronized (this) { - if (!started) { - throw new IllegalStateException("TimedBuffer is not started"); - } + enterCritical(CRITICAL_PATH_FLUSH); + try { + synchronized (this) { + if (!started) { + throw new IllegalStateException("TimedBuffer is not started"); + } - enterCritical(CRITICAL_PATH_FLUSH); - try { if (!delayFlush && buffer.writerIndex() > 0) { int pos = buffer.writerIndex(); @@ -326,9 +374,9 @@ public final class TimedBuffer extends CriticalComponentImpl { } else { return false; } - } finally { - leaveCritical(CRITICAL_PATH_FLUSH); } + } finally { + leaveCritical(CRITICAL_PATH_FLUSH); } } @@ -452,7 +500,6 @@ public final class TimedBuffer extends CriticalComponentImpl { failedChecks++; } - if (++checks >= MAX_CHECKS_ON_SLEEP) { if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) { logger.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 6dc45c087e..6defb1eb85 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; @@ -70,6 +71,10 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { return Long.MAX_VALUE; } + default SequentialFileFactory getJournalSequentialFileFactory() { + return null; + } + void criticalError(Throwable error); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 160d12daa8..87f4fc9982 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -114,6 +114,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { super(config, analyzer, executorFactory, null, ioExecutors, criticalErrorListener); } + @Override + public SequentialFileFactory getJournalSequentialFileFactory() { + return journalFF; + } + @Override protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java index 156edd73b5..d96cb3cd9f 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/CriticalAnalyzerFaultInjectionTestBase.java @@ -37,7 +37,7 @@ public abstract class CriticalAnalyzerFaultInjectionTestBase extends JMSTestBase private static long CHECK_PERIOD = 100; private static long TIMEOUT = 3000; - private SimpleString address = SimpleString.toSimpleString("faultInjectionTestAddress"); + protected SimpleString address = SimpleString.toSimpleString("faultInjectionTestAddress"); private Thread t; diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java index b14b3e6b9b..d5cfe49683 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/critical/analyzer/FileSystemSyncBlockedTest.java @@ -16,9 +16,15 @@ */ package org.apache.activemq.artemis.tests.extras.byteman.critical.analyzer; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,4 +52,26 @@ public class FileSystemSyncBlockedTest extends CriticalAnalyzerFaultInjectionTes public void testSlowDiskSync() throws Exception { testSendDurableMessage(); } + + @Test + public void testManyFiles() throws Exception + { + Session s = conn.createSession(true, Session.SESSION_TRANSACTED); + + Queue jmsQueue = s.createQueue(address.toString()); + MessageProducer p = s.createProducer(jmsQueue); + p.setDeliveryMode(DeliveryMode.PERSISTENT); + conn.start(); + for (int i = 0; i < 1000; i++) + { + p.send(s.createTextMessage("payload")); + server.getStorageManager().getMessageJournal().forceMoveNextFile(); + } + s.commit(); + + // if you have more than 100 components, then you have a leak! + Assert.assertTrue(server.getStorageManager().getJournalSequentialFileFactory().getCriticalAnalyzer().getNumberOfComponents() < 10); + System.out.println("Number of components:" + server.getStorageManager().getJournalSequentialFileFactory().getCriticalAnalyzer().getNumberOfComponents()); + + } }