ARTEMIS-3271 Improve Critical Analyzer to use AutoCloseable on the API

This commit is contained in:
Clebert Suconic 2021-04-28 15:04:46 -04:00 committed by clebertsuconic
parent a3295c9873
commit d2676e77f8
21 changed files with 551 additions and 669 deletions

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
public interface ArtemisCloseable extends AutoCloseable {
/** The main purpose of this interface is to hide the exception since it is not needed. */
@Override
void close();
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.critical;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
public interface CriticalCloseable extends ArtemisCloseable {
/** This will set something to be called right before closing.
*
* The use case that drove this call was a ReadWriteLock on the journal.
* Imagine that you need to call enterCritical, readWrite.lock() and then unlock and leaveCritical.
* By using this call I could reuse the same instance on the readWriteLock. */
void beforeClose(ArtemisCloseable otherCloseable);
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.utils.critical;
/**
* A Critical component enters and leaves a critical state.
* You update a long every time you enter a critical path
@ -27,26 +26,9 @@ package org.apache.activemq.artemis.utils.critical;
*/
public interface CriticalComponent {
default CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
/**
* please save the time you entered here.
* Use volatile variables.
* No locks through anywhere.
*/
default void enterCritical(int path) {
// I'm providing a default as some components may chose to calculate it themselves
}
CriticalAnalyzer getCriticalAnalyzer();
/**
* please save the time you entered here
* Use volatile variables.
* No locks through anywhere.
*/
default void leaveCritical(int path) {
// I'm providing a default as some components may chose to calculate it themselves
}
CriticalCloseable measureCritical(int path);
/**
* Check if the component is expired at a given timeout.. on any of its paths.

View File

@ -47,17 +47,11 @@ public class CriticalComponentImpl implements CriticalComponent {
}
@Override
public void enterCritical(int path) {
if (analyzer.isMeasuring()) {
measures[path].enterCritical();
}
}
@Override
public void leaveCritical(int path) {
if (analyzer.isMeasuring()) {
measures[path].leaveCritical();
public CriticalCloseable measureCritical(int path) {
if (!analyzer.isMeasuring()) {
return CriticalMeasure.dummyCloseable;
} else {
return measures[path].measure();
}
}

View File

@ -17,24 +17,58 @@
package org.apache.activemq.artemis.utils.critical;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.jboss.logging.Logger;
public class CriticalMeasure {
public static boolean isDummy(ArtemisCloseable closeable) {
return closeable == dummyCloseable;
}
private static final Logger logger = Logger.getLogger(CriticalMeasure.class);
// this is used on enterCritical, if the logger is in trace mode
private volatile Exception traceEnter;
static final AtomicIntegerFieldUpdater<CriticalMeasure> CURRENT_MEASURING = AtomicIntegerFieldUpdater.newUpdater(CriticalMeasure.class, "measuring");
static final AtomicReferenceFieldUpdater<CriticalMeasure, Thread> CURRENT_THREAD_UDPATER = AtomicReferenceFieldUpdater.newUpdater(CriticalMeasure.class, Thread.class, "currentThread");
private final CriticalCloseable autoCloseable = new CriticalCloseable() {
ArtemisCloseable beforeClose;
// While resetting the leaveMethod, I want to make sure no enter call would reset the value.
// so I set the Current Thread to this Ghost Thread, to then set it back to null
private static final Thread GHOST_THREAD = new Thread();
@Override
public void beforeClose(ArtemisCloseable closeable) {
beforeClose = closeable;
}
private volatile Thread currentThread;
@Override
public void close() {
try {
if (beforeClose != null) {
beforeClose.close();
beforeClose = null;
}
} finally {
leaveCritical();
CURRENT_MEASURING.set(CriticalMeasure.this, 0);
}
}
};
protected static final CriticalCloseable dummyCloseable = new CriticalCloseable() {
@Override
public void beforeClose(ArtemisCloseable runnable) {
throw new IllegalStateException("The dummy closeable does not support beforeClose. Check before CriticalMeasure.isDummy(closeable) before you call beforeClose(runnable)");
}
@Override
public void close() {
}
};
// this is working like a boolean, although using AtomicIntegerFieldUpdater instead
protected volatile int measuring;
protected volatile long timeEnter;
private final int id;
@ -46,45 +80,43 @@ public class CriticalMeasure {
this.timeEnter = 0;
}
public void enterCritical() {
// a sampling of a single thread at a time will be sufficient for the analyser,
// typically what causes one thread to stall will repeat on another
if (CURRENT_THREAD_UDPATER.compareAndSet(this, null, Thread.currentThread())) {
timeEnter = System.nanoTime();
if (logger.isTraceEnabled()) {
traceEnter = new Exception("entered");
}
public CriticalCloseable measure() {
// I could have chosen to simply store the time on this value, however I would be calling nanoTime a lot of times
// to just waste the value
// So, I keep a measuring atomic to protect the thread sampling,
// and I will still do the set using a single thread.
if (CURRENT_MEASURING.compareAndSet(this, 0, 1)) {
enterCritical();
return autoCloseable;
} else {
return dummyCloseable;
}
}
public void leaveCritical() {
protected void enterCritical() {
timeEnter = System.nanoTime();
if (CURRENT_THREAD_UDPATER.compareAndSet(this, Thread.currentThread(), GHOST_THREAD)) {
// NULL_THREAD here represents a state where I would be ignoring any call to enterCritical or leaveCritical, while I reset the Time Enter Update
// This is to avoid replacing time Enter by a new Value, right after current Thread is set to Null.
// So we set to this ghost value while we are setting
if (logger.isTraceEnabled()) {
CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null;
if (analyzer != null) {
long nanoTimeout = analyzer.getTimeoutNanoSeconds();
if (checkExpiration(nanoTimeout, false)) {
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left"));
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter);
}
}
traceEnter = null;
}
this.timeEnter = 0;
// I am pretty sure this is single threaded by now.. I don't need compareAndSet here
CURRENT_THREAD_UDPATER.set(this, null);
if (logger.isTraceEnabled()) {
traceEnter = new Exception("entered");
}
}
protected void leaveCritical() {
if (logger.isTraceEnabled()) {
CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null;
if (analyzer != null) {
long nanoTimeout = analyzer.getTimeoutNanoSeconds();
if (checkExpiration(nanoTimeout, false)) {
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left"));
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter);
}
}
traceEnter = null;
}
timeEnter = 0L;
}
protected String getComponentName() {
if (component == null) {
return "null";
@ -94,8 +126,8 @@ public class CriticalMeasure {
}
public boolean checkExpiration(long timeout, boolean reset) {
final long timeEnter = this.timeEnter;
if (timeEnter != 0L) {
final long thisTimeEnter = this.timeEnter;
if (thisTimeEnter != 0L) {
long time = System.nanoTime();
boolean expired = time - timeEnter > timeout;

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.utils.critical;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After;
@ -41,10 +43,75 @@ public class CriticalAnalyzerTest {
}
}
@Test
public void testDummy() {
analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
analyzer.add(component);
CriticalCloseable closeable1 = component.measureCritical(0);
Assert.assertFalse(CriticalMeasure.isDummy(closeable1));
ArtemisCloseable closeable2 = component.measureCritical(0);
Assert.assertTrue(CriticalMeasure.isDummy(closeable2));
closeable1.close();
closeable2 = component.measureCritical(0);
Assert.assertFalse(CriticalMeasure.isDummy(closeable2));
}
@Test
public void testCall() {
analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
analyzer.add(component);
CriticalCloseable closeable = component.measureCritical(0);
Assert.assertFalse(CriticalMeasure.isDummy(closeable));
CriticalCloseable dummy = component.measureCritical(0);
boolean exception = false;
try {
dummy.beforeClose(() -> System.out.println("never hapening"));
} catch (Throwable e) {
exception = true;
}
Assert.assertTrue(exception);
AtomicInteger value = new AtomicInteger(0);
closeable.beforeClose(() -> value.set(1000));
Assert.assertEquals(0, value.get());
closeable.close();
Assert.assertEquals(1000, value.get());
}
@Test
public void testAction() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
analyzer.add(new CriticalComponent() {
@Override
public CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
@Override
public CriticalCloseable measureCritical(int path) {
return null;
}
@Override
public boolean checkExpiration(long timeout, boolean reset) {
return true;
@ -77,9 +144,8 @@ public class CriticalAnalyzerTest {
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
analyzer.add(component);
component.enterCritical(0);
component.leaveCritical(0);
component.enterCritical(1);
component.measureCritical(0).close();
component.measureCritical(1);
analyzer.start();
@ -93,7 +159,7 @@ public class CriticalAnalyzerTest {
public void testEnterNoLeaveNoExpire() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
component.enterCritical(0);
component.measureCritical(0);
Assert.assertFalse(component.checkExpiration(TimeUnit.MINUTES.toNanos(1), false));
analyzer.stop();
@ -103,7 +169,7 @@ public class CriticalAnalyzerTest {
public void testEnterNoLeaveExpire() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
component.enterCritical(0);
component.measureCritical(0);
Thread.sleep(50);
Assert.assertTrue(component.checkExpiration(0, false));
analyzer.stop();
@ -123,8 +189,7 @@ public class CriticalAnalyzerTest {
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
analyzer.add(component);
component.enterCritical(0);
component.leaveCritical(0);
component.measureCritical(0).close();
analyzer.start();
@ -146,14 +211,14 @@ public class CriticalAnalyzerTest {
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
analyzer.add(component);
component.enterCritical(0);
AutoCloseable measure = component.measureCritical(0);
Thread.sleep(50);
analyzer.start();
Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
component.leaveCritical(0);
measure.close();
latch.setCount(1);

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.utils.critical;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Test;
@ -38,7 +39,7 @@ public class CriticalMeasureTest {
CriticalComponent component = new CriticalComponentImpl(analyzer, 5);
CriticalMeasure measure = new CriticalMeasure(component, 1);
long time = System.nanoTime();
CriticalMeasure.CURRENT_THREAD_UDPATER.set(measure, Thread.currentThread());
measure.enterCritical();
measure.timeEnter = time - TimeUnit.MINUTES.toNanos(30);
measure.leaveCritical();
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
@ -50,13 +51,26 @@ public class CriticalMeasureTest {
CriticalComponent component = new CriticalComponentImpl(analyzer, 5);
CriticalMeasure measure = new CriticalMeasure(component, 1);
long time = System.nanoTime();
measure.enterCritical();
AutoCloseable closeable = measure.measure();
measure.timeEnter = time - TimeUnit.MINUTES.toNanos(5);
Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); // on this call we should had a reset before
// subsequent call without reset should still fail
Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), true));
// previous reset should have cleared it
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
measure.leaveCritical();
closeable.close();
}
@Test
public void testWithCloseable() throws Exception {
CriticalAnalyzer analyzer = new CriticalAnalyzerImpl();
CriticalComponent component = new CriticalComponentImpl(analyzer, 5);
CriticalMeasure measure = new CriticalMeasure(component, 1);
long time = System.nanoTime();
try (AutoCloseable theMeasure = component.measureCritical(0)) {
LockSupport.parkNanos(1000);
Assert.assertTrue(component.checkExpiration(100, false));
}
Assert.assertFalse(component.checkExpiration(100, false));
}
}

View File

@ -42,7 +42,7 @@ public class MultiThreadCriticalMeasureTest {
ReusableLatch latch = new ReusableLatch(0);
ReusableLatch latchOnMeasure = new ReusableLatch(0);
try {
CriticalMeasure measure = new CriticalMeasure((t, r) -> false, 0);
CriticalMeasure measure = new CriticalMeasure(null, 0);
CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
@ -56,9 +56,9 @@ public class MultiThreadCriticalMeasureTest {
latch.await();
}
measure.enterCritical();
latchOnMeasure.await();
measure.leaveCritical();
try (AutoCloseable closeable = measure.measure()) {
latchOnMeasure.await();
}
}
} catch (Throwable e) {
e.printStackTrace();

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.jboss.logging.Logger;
@ -56,46 +57,30 @@ public final class TimedBuffer extends CriticalComponentImpl {
private static final int MAX_CHECKS_ON_SLEEP = 20;
// Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver;
// If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
// in spinning and checking the time - and using up CPU in the process - this semaphore is used to
// prevent that
private final Semaphore spinLimiter = new Semaphore(1);
private CheckTimer timerRunnable;
private final int bufferSize;
private final ActiveMQBuffer buffer;
private int bufferLimit = 0;
private List<IOCallback> callbacks;
private final int timeout;
private final boolean logRates;
private final AtomicLong bytesFlushed = new AtomicLong(0);
private final AtomicLong flushesDone = new AtomicLong(0);
private TimedBufferObserver bufferObserver;
private CheckTimer timerRunnable;
private int bufferLimit = 0;
private List<IOCallback> callbacks;
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
private volatile boolean pendingSync = false;
// for logging write rates
private Thread timerThread;
private volatile boolean started;
// We use this flag to prevent flush occurring between calling checkSize and addBytes
// CheckSize must always be followed by it's corresponding addBytes otherwise the buffer
// can get in an inconsistent state
private boolean delayFlush;
// for logging write rates
private final boolean logRates;
private final AtomicLong bytesFlushed = new AtomicLong(0);
private final AtomicLong flushesDone = new AtomicLong(0);
private Timer logRatesTimer;
private TimerTask logRatesTimerTask;
@ -135,8 +120,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
}
public void start() {
enterCritical(CRITICAL_PATH_START);
try {
try (ArtemisCloseable critical = measureCritical(CRITICAL_PATH_START)) {
synchronized (this) {
if (started) {
return;
@ -163,15 +147,12 @@ public final class TimedBuffer extends CriticalComponentImpl {
started = true;
}
} finally {
leaveCritical(CRITICAL_PATH_START);
}
}
public void stop() {
enterCritical(CRITICAL_PATH_STOP);
Thread localTimer = null;
try {
try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_STOP)) {
// add critical analyzer here.... <<<<
synchronized (this) {
try {
@ -210,14 +191,11 @@ public final class TimedBuffer extends CriticalComponentImpl {
}
}
}
} finally {
leaveCritical(CRITICAL_PATH_STOP);
}
}
public void setObserver(final TimedBufferObserver observer) {
enterCritical(CRITICAL_PATH_SET_OBSERVER);
try {
try (AutoCloseable measure = measureCritical(CRITICAL_PATH_SET_OBSERVER)) {
synchronized (this) {
if (bufferObserver != null) {
flush();
@ -225,8 +203,8 @@ public final class TimedBuffer extends CriticalComponentImpl {
bufferObserver = observer;
}
} finally {
leaveCritical(CRITICAL_PATH_SET_OBSERVER);
} catch (Exception shouldNotHappen) {
logger.debug(shouldNotHappen);
}
}
@ -236,8 +214,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
* @param sizeChecked
*/
public boolean checkSize(final int sizeChecked) {
enterCritical(CRITICAL_PATH_CHECK_SIZE);
try {
try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_CHECK_SIZE)) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
@ -274,14 +251,11 @@ public final class TimedBuffer extends CriticalComponentImpl {
return true;
}
}
} finally {
leaveCritical(CRITICAL_PATH_CHECK_SIZE);
}
}
public void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
enterCritical(CRITICAL_PATH_ADD_BYTES);
try {
try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_ADD_BYTES)) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
@ -303,14 +277,11 @@ public final class TimedBuffer extends CriticalComponentImpl {
startSpin();
}
}
} finally {
leaveCritical(CRITICAL_PATH_ADD_BYTES);
}
}
public void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
enterCritical(CRITICAL_PATH_ADD_BYTES);
try {
try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_ADD_BYTES)) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
@ -328,10 +299,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
startSpin();
}
}
} finally {
leaveCritical(CRITICAL_PATH_ADD_BYTES);
}
}
public void flush() {
@ -344,8 +312,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
* @return {@code true} when are flushed any bytes, {@code false} otherwise
*/
public boolean flushBatch() {
enterCritical(CRITICAL_PATH_FLUSH);
try {
try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_FLUSH)) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
@ -378,8 +345,6 @@ public final class TimedBuffer extends CriticalComponentImpl {
return false;
}
}
} finally {
leaveCritical(CRITICAL_PATH_FLUSH);
}
}
@ -391,6 +356,43 @@ public final class TimedBuffer extends CriticalComponentImpl {
// Inner classes -------------------------------------------------
/**
* Sub classes (tests basically) can use this to override how the sleep is being done
*
* @param sleepNanos
*/
protected void sleep(long sleepNanos) {
LockSupport.parkNanos(sleepNanos);
}
/**
* Sub classes (tests basically) can use this to override disabling spinning
*/
protected void stopSpin() {
if (spinning) {
try {
// We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
// when the buffer is inactive
spinLimiter.acquire();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
spinning = false;
}
}
/**
* Sub classes (tests basically) can use this to override disabling spinning
*/
protected void startSpin() {
if (!spinning) {
spinLimiter.release();
spinning = true;
}
}
private class LogRatesTimerTask extends TimerTask {
private boolean closed;
@ -434,10 +436,9 @@ public final class TimedBuffer extends CriticalComponentImpl {
private class CheckTimer implements Runnable {
private volatile boolean closed = false;
int checks = 0;
int failedChecks = 0;
private volatile boolean closed = false;
@Override
public void run() {
@ -523,41 +524,4 @@ public final class TimedBuffer extends CriticalComponentImpl {
}
}
/**
* Sub classes (tests basically) can use this to override how the sleep is being done
*
* @param sleepNanos
*/
protected void sleep(long sleepNanos) {
LockSupport.parkNanos(sleepNanos);
}
/**
* Sub classes (tests basically) can use this to override disabling spinning
*/
protected void stopSpin() {
if (spinning) {
try {
// We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
// when the buffer is inactive
spinLimiter.acquire();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
spinning = false;
}
}
/**
* Sub classes (tests basically) can use this to override disabling spinning
*/
protected void startSpin() {
if (!spinning) {
spinLimiter.release();
spinning = true;
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap;
import org.apache.activemq.artemis.utils.ThreadDumpUtil;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@ -438,80 +439,76 @@ public class PageCursorProviderImpl implements PageCursorProvider {
//
// I tried to simplify the locks but each PageStore has its own lock, so this was the best option
// I found in order to fix https://issues.apache.org/jira/browse/ARTEMIS-3054
storageManager.readLock();
try (ArtemisCloseable readLock = storageManager.closeableReadLock()) {
while (true) {
if (pagingStore.lock(100)) {
break;
while (true) {
if (pagingStore.lock(100)) {
break;
}
if (!pagingStore.isStarted())
return;
}
if (!pagingStore.isStarted())
return;
}
logger.tracef("%s locked", this);
synchronized (this) {
try {
if (!pagingStore.isStarted()) {
return;
}
if (pagingStore.getNumberOfPages() == 0) {
return;
}
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
long minPage = checkMinPage(cursorList);
deliverIfNecessary(cursorList, minPage);
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
// if the current page is being written...
// on that case we need to move to verify it in a different way
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
boolean complete = checkPageCompletion(cursorList, minPage);
logger.tracef("%s locked", this);
synchronized (this) {
try {
if (!pagingStore.isStarted()) {
return;
}
// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
if (complete) {
if (pagingStore.getNumberOfPages() == 0) {
return;
}
cleanupComplete(cursorList);
}
}
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
for (long i = pagingStore.getFirstPage(); i <= minPage; i++) {
if (!checkPageCompletion(cursorList, i)) {
break;
}
Page page = pagingStore.depage();
if (page == null) {
break;
}
depagedPages.add(page);
}
long minPage = checkMinPage(cursorList);
deliverIfNecessary(cursorList, minPage);
if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && pagingStore.getCurrentPage().getNumberOfMessages() == 0) {
pagingStore.stopPaging();
} else {
if (logger.isTraceEnabled()) {
logger.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() +
" as numberOfPages == " +
pagingStore.getNumberOfPages() +
" and currentPage.numberOfMessages = " +
pagingStore.getCurrentPage().getNumberOfMessages());
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
// if the current page is being written...
// on that case we need to move to verify it in a different way
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
boolean complete = checkPageCompletion(cursorList, minPage);
if (!pagingStore.isStarted()) {
return;
}
// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
if (complete) {
cleanupComplete(cursorList);
}
}
for (long i = pagingStore.getFirstPage(); i <= minPage; i++) {
if (!checkPageCompletion(cursorList, i)) {
break;
}
Page page = pagingStore.depage();
if (page == null) {
break;
}
depagedPages.add(page);
}
if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && pagingStore.getCurrentPage().getNumberOfMessages() == 0) {
pagingStore.stopPaging();
} else {
if (logger.isTraceEnabled()) {
logger.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() + " as numberOfPages == " + pagingStore.getNumberOfPages() + " and currentPage.numberOfMessages = " + pagingStore.getCurrentPage().getNumberOfMessages());
}
}
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(ex, pagingStore.getAddress());
logger.warn(ex.getMessage(), ex);
return;
} finally {
pagingStore.unlock();
}
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(ex, pagingStore.getAddress());
logger.warn(ex.getMessage(), ex);
return;
} finally {
pagingStore.unlock();
storageManager.readUnLock();
}
}
finishCleanup(depagedPages);

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.jboss.logging.Logger;
/**
@ -254,8 +255,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
@Override
public void delete(Transaction tx) throws Exception {
// always lock the StorageManager first.
storage.readLock();
try {
try (ArtemisCloseable lock = storage.closeableReadLock()) {
synchronized (this) {
for (Long record : incrementRecords) {
storage.deleteIncrementRecord(tx.getID(), record.longValue());
@ -271,8 +271,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
value.set(0);
incrementRecords.clear();
}
} finally {
storage.readUnLock();
}
}

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.IDGenerator;
/**
@ -479,15 +480,9 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* say Paging classes, that use locks of their own AND also write through the StorageManager MUST
* first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock
* when starting replication sync.
*/
void readLock();
/**
* Unlock the manager.
*
* @see StorageManager#readLock()
*/
void readUnLock();
ArtemisCloseable closeableReadLock();
/**
* Closes the {@link IDGenerator} persisting the current record ID.

View File

@ -116,12 +116,15 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.CriticalMeasure;
import org.jboss.logging.Logger;
/**
@ -171,6 +174,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
// I would rather cache the Closeable instance here..
// I never know when the JRE decides to create a new instance on every call.
// So I'm playing safe here. That's all
protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock;
protected Journal messageJournal;
protected Journal bindingsJournal;
@ -340,12 +348,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
installLargeMessageConfirmationOnTX(tx, recordID);
messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, messageID));
} finally {
readUnLock();
}
}
@ -354,11 +359,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
*/
@Override
public void confirmPendingLargeMessage(long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(recordID, true, getContext());
} finally {
readUnLock();
}
}
@ -369,9 +371,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned();
}
readLock();
try {
// Note that we don't sync, the add reference that comes immediately after will sync if
try (ArtemisCloseable lock = closeableReadLock()) { // Note that we don't sync, the add reference that comes immediately after will sync if
// appropriate
if (message.isLargeMessage() && message instanceof LargeServerMessageImpl) {
@ -379,109 +379,100 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} else {
messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false));
}
} finally {
readUnLock();
}
}
@Override
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional));
} finally {
readUnLock();
}
}
@Override
public void readLock() {
enterCritical(CRITICAL_STORE);
public ArtemisCloseable closeableReadLock() {
CriticalCloseable measure = measureCritical(CRITICAL_STORE);
storageManagerLock.readLock().lock();
if (CriticalMeasure.isDummy(measure)) {
// The next statement could have been called like this:
// return storageManagerLock.readLock()::unlock;
// However I wasn't 100% sure the JDK would take good care
// of caching for me.
// Since this is important to me here, I decided to play safe and
// cache it myself
return unlockCloseable;
} else {
// Same applies to the next statement here
// measure.beforeClose(storageManagerLock.readLock()::unlock);
// I'm just playing safe and caching it myself
measure.beforeClose(unlockCloseable);
return measure;
}
}
@Override
public void readUnLock() {
storageManagerLock.readLock().unlock();
leaveCritical(CRITICAL_STORE);
}
/** for internal use and testsuite, don't use it outside of tests */
/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeLock() {
storageManagerLock.writeLock().lock();
}
/** for internal use and testsuite, don't use it outside of tests */
/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}
@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
}
@Override
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
}
@Override
public boolean deleteMessage(final long messageID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false));
} finally {
readUnLock();
}
}
@Override
public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception {
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
}
@Override
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID, JournalRecordIds.DUPLICATE_ID, encoding, syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
}
@Override
public void deleteDuplicateID(final long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
}
@ -493,8 +484,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned();
}
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (message.isLargeMessage() && message instanceof LargeServerMessageImpl) {
// this is a core large message
messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message);
@ -502,19 +492,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message);
}
} finally {
readUnLock();
}
}
@Override
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
pageTransaction.setRecordID(generateID());
messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, pageTransaction);
} finally {
readUnLock();
}
}
@ -522,21 +507,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void updatePageTransaction(final long txID,
final PageTransactionInfo pageTransaction,
final int depages) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
} finally {
readUnLock();
}
}
@Override
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID));
} finally {
readUnLock();
}
}
@ -544,23 +523,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void storeAcknowledgeTransactional(final long txID,
final long queueID,
final long messageID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID));
} finally {
readUnLock();
}
}
@Override
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecordTransactional(txID, ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
} finally {
readUnLock();
}
}
@ -578,11 +551,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecordTransactional(txID, ackID);
} finally {
readUnLock();
}
}
@ -593,57 +563,40 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long id = generateID();
messageJournal.appendAddRecord(id, JournalRecordIds.HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext(true));
return id;
} finally {
readUnLock();
}
}
@Override
public void deleteHeuristicCompletion(final long id) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(id, true, getContext(true));
} finally {
readUnLock();
}
}
@Override
public void deletePageTransactional(final long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecord(recordID, false);
} finally {
readUnLock();
}
}
@Override
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception {
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecordTransactional(txID, ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding);
} finally {
readUnLock();
}
}
@Override
public void prepare(final long txID, final Xid xid) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional));
} finally {
readUnLock();
}
}
@ -665,8 +618,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void commit(final long txID, final boolean lineUpContext) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional) {
if (logger.isTraceEnabled()) {
@ -681,18 +633,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
*/
getContext(true).done();
}
} finally {
readUnLock();
}
}
@Override
public void rollback(final long txID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional));
} finally {
readUnLock();
}
}
@ -703,11 +650,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final long recordID) throws Exception {
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding);
} finally {
readUnLock();
}
}
@ -718,21 +662,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final long recordID) throws Exception {
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendUpdateRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding);
} finally {
readUnLock();
}
}
@Override
public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
} finally {
readUnLock();
}
}
@ -749,25 +687,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
} finally {
readUnLock();
}
}
@Override
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception {
deleteAddressSetting(addressSetting.getAddressMatch());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long id = idGenerator.generateID();
addressSetting.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.ADDRESS_SETTING_RECORD, addressSetting, true);
mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
} finally {
readUnLock();
}
}
@ -785,28 +717,22 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception {
deleteSecuritySetting(persistedRoles.getAddressMatch());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedRoles.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.SECURITY_SETTING_RECORD, persistedRoles, true);
mapPersistedSecuritySettings.put(persistedRoles.getAddressMatch(), persistedRoles);
} finally {
readUnLock();
}
}
@Override
public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception {
deleteDivertConfiguration(persistedDivertConfiguration.getName());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedDivertConfiguration.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.DIVERT_RECORD, persistedDivertConfiguration, true);
mapPersistedDivertConfigurations.put(persistedDivertConfiguration.getName(), persistedDivertConfiguration);
} finally {
readUnLock();
}
}
@ -814,11 +740,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void deleteDivertConfiguration(String divertName) throws Exception {
PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName);
if (oldDivert != null) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldDivert.getStoreId(), false);
} finally {
readUnLock();
}
}
}
@ -831,14 +754,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
deleteUser(persistedUser.getUsername());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedUser.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.USER_RECORD, persistedUser, true);
mapPersistedUsers.put(persistedUser.getUsername(), persistedUser);
} finally {
readUnLock();
}
}
@ -846,11 +766,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void deleteUser(String username) throws Exception {
PersistedUser oldUser = mapPersistedUsers.remove(username);
if (oldUser != null) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldUser.getStoreId(), false);
} finally {
readUnLock();
}
}
}
@ -863,14 +780,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void storeRole(PersistedRole persistedRole) throws Exception {
deleteRole(persistedRole.getUsername());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedRole.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.ROLE_RECORD, persistedRole, true);
mapPersistedRoles.put(persistedRole.getUsername(), persistedRole);
} finally {
readUnLock();
}
}
@ -878,11 +792,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void deleteRole(String username) throws Exception {
PersistedRole oldRole = mapPersistedRoles.remove(username);
if (oldRole != null) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldRole.getStoreId(), false);
} finally {
readUnLock();
}
}
}
@ -894,21 +805,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void storeID(final long journalID, final long id) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendAddRecord(journalID, JournalRecordIds.ID_COUNTER_RECORD, BatchingIDGenerator.createIDEncodingSupport(id), true, getContext(true));
} finally {
readUnLock();
}
}
@Override
public void deleteID(long journalD) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(journalD, false);
} finally {
readUnLock();
}
}
@ -916,11 +821,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void deleteAddressSetting(SimpleString addressMatch) throws Exception {
PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
if (oldSetting != null) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
} finally {
readUnLock();
}
}
}
@ -929,11 +831,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public void deleteSecuritySetting(SimpleString addressMatch) throws Exception {
PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch);
if (oldRoles != null) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
} finally {
readUnLock();
}
}
}
@ -954,9 +853,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
Set<PageTransactionInfo> invalidPageTransactions = new HashSet<>();
Map<Long, Message> messages = new HashMap<>();
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.setRemoveExtraFilesOnLoad(true);
JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this));
@ -976,13 +873,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
final MutableLong recordNumber = new MutableLong();
final CoreMessageObjectPools pools;
if (totalSize > 0) {
final int addresses = (int)Math.max(
DEFAULT_POOL_CAPACITY,
queueInfos == null ? 0 :
queueInfos.values().stream()
.map(QueueBindingInfo::getAddress)
.filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH)
.count() * 2);
final int addresses = (int) Math.max(DEFAULT_POOL_CAPACITY, queueInfos == null ? 0 : queueInfos.values().stream().map(QueueBindingInfo::getAddress).filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH).count() * 2);
pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128);
} else {
pools = null;
@ -1328,8 +1219,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
journalLoaded = true;
return info;
} finally {
readUnLock();
}
}
@ -1380,21 +1269,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void addGrouping(final GroupBinding groupBinding) throws Exception {
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendAddRecord(groupBinding.getId(), JournalRecordIds.GROUP_RECORD, groupingEncoding, true);
} finally {
readUnLock();
}
}
@Override
public void deleteGrouping(long tx, final GroupBinding groupBinding) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecordTransactional(tx, groupBinding.getId());
} finally {
readUnLock();
}
}
@ -1419,25 +1302,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (update) {
bindingsJournal.appendUpdateRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
} else {
bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding);
}
} finally {
readUnLock();
}
}
@Override
public void deleteQueueBinding(long tx, final long queueBindingID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecordTransactional(tx, queueBindingID);
} finally {
readUnLock();
}
}
@ -1445,24 +1322,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception {
long recordID = idGenerator.generateID();
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendAddRecord(recordID, JournalRecordIds.QUEUE_STATUS_RECORD, new QueueStatusEncoding(queueID, status), true);
} finally {
readUnLock();
}
return recordID;
}
@Override
public void deleteQueueStatus(long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(recordID, true);
} finally {
readUnLock();
}
}
@ -1470,132 +1340,96 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception {
long recordID = idGenerator.generateID();
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendAddRecord(recordID, JournalRecordIds.ADDRESS_STATUS_RECORD, new AddressStatusEncoding(addressID, status), true);
} finally {
readUnLock();
}
return recordID;
}
@Override
public void deleteAddressStatus(long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecord(recordID, true);
} finally {
readUnLock();
}
}
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(),
addressInfo.getRoutingTypes(),
addressInfo.isAutoCreated());
PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated());
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = idGenerator.generateID();
bindingEncoding.setId(recordID);
addressInfo.setId(recordID);
bindingsJournal.appendAddRecordTransactional(tx, recordID, JournalRecordIds.ADDRESS_BINDING_RECORD, bindingEncoding);
} finally {
readUnLock();
}
}
@Override
public void deleteAddressBinding(long tx, final long addressBindingID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.appendDeleteRecordTransactional(tx, addressBindingID);
} finally {
readUnLock();
}
}
@Override
public long storePageCounterInc(long txID, long queueID, int value, long persistentSize) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize));
return recordID;
} finally {
readUnLock();
}
}
@Override
public long storePageCounterInc(long queueID, int value, long persistentSize) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long recordID = idGenerator.generateID();
messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize), true, getContext());
return recordID;
} finally {
readUnLock();
}
}
@Override
public long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value, persistentSize));
return recordID;
} finally {
readUnLock();
}
}
@Override
public long storePendingCounter(final long queueID, final long pageID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
final long recordID = idGenerator.generateID();
PageCountPendingImpl pendingInc = new PageCountPendingImpl(queueID, pageID);
// We must guarantee the record sync before we actually write on the page otherwise we may get out of sync
// on the counter
messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER, pendingInc, true);
return recordID;
} finally {
readUnLock();
}
}
@Override
public void deleteIncrementRecord(long txID, long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
} finally {
readUnLock();
}
}
@Override
public void deletePageCounter(long txID, long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
} finally {
readUnLock();
}
}
@Override
public void deletePendingPageCounter(long txID, long recordID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
} finally {
readUnLock();
}
}
@ -1694,11 +1528,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public void lineUpContext() {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.lineUpContext(getContext());
} finally {
readUnLock();
}
}
@ -1787,15 +1618,12 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
* TODO: Is this still being used ?
*/
public JournalLoadInformation[] loadInternalOnly() throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
JournalLoadInformation[] info = new JournalLoadInformation[2];
info[0] = bindingsJournal.loadInternalOnly();
info[1] = messageJournal.loadInternalOnly();
return info;
} finally {
readUnLock();
}
}
@ -2171,6 +1999,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
persistedRole.setStoreId(id);
return persistedRole;
}
/**
* @param id
* @param buffer
@ -2221,10 +2050,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
@Override
public boolean addToPage(PagingStore store,
Message msg,
Transaction tx,
RouteContextList listCtx) throws Exception {
public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to

View File

@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@ -253,14 +254,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception {
try {
enterCritical(CRITICAL_STOP);
try (ArtemisCloseable critical = measureCritical(CRITICAL_STOP)) {
synchronized (this) {
if (internalStop(ioCriticalError, sendFailover))
return;
}
} finally {
leaveCritical(CRITICAL_STOP);
}
}
@ -290,39 +288,39 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// that's ok
}
enterCritical(CRITICAL_STOP_2);
storageManagerLock.writeLock().lock();
try {
try (ArtemisCloseable critical = measureCritical(CRITICAL_STOP_2)) {
storageManagerLock.writeLock().lock();
try {
// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
// and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) {
try {
token.waitCompletion(5000);
} catch (Exception e) {
// ignore it
// We cache the variable as the replicator could be changed between here and the time we call stop
// since sendLiveIsStopping may issue a close back from the channel
// and we want to ensure a stop here just in case
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse != null) {
if (sendFailover) {
final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER);
if (token != null) {
try {
token.waitCompletion(5000);
} catch (Exception e) {
// ignore it
}
}
}
// we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown
// while the backup will never receive then
replicatorInUse.stop(false);
}
// we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown
// while the backup will never receive then
replicatorInUse.stop(false);
bindingsJournal.stop();
messageJournal.stop();
journalLoaded = false;
started = false;
} finally {
storageManagerLock.writeLock().unlock();
}
bindingsJournal.stop();
messageJournal.stop();
journalLoaded = false;
started = false;
} finally {
storageManagerLock.writeLock().unlock();
leaveCritical(CRITICAL_STOP_2);
}
return false;
}
@ -388,12 +386,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
public void pageClosed(final SimpleString storeName, final int pageNumber) {
if (isReplicated()) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated())
replicator.pageClosed(storeName, pageNumber);
} finally {
readUnLock();
}
}
}
@ -401,12 +396,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
public void pageDeleted(final SimpleString storeName, final int pageNumber) {
if (isReplicated()) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated())
replicator.pageDeleted(storeName, pageNumber);
} finally {
readUnLock();
}
}
}
@ -420,12 +412,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// Say you are sending durable and non-durable messages to a page
// The ACKs would be done to wrong positions, and the backup would be a mess
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated())
replicator.pageWrite(message, pageNumber);
} finally {
readUnLock();
}
}
}
@ -441,26 +430,20 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
public long storePendingLargeMessage(final long messageID) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = generateID();
messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true));
return recordID;
} finally {
readUnLock();
}
}
@Override
public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated()) {
replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this);
}
} finally {
readUnLock();
}
}
@ -485,22 +468,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
if (largeServerMessage.toMessage().isDurable() && isReplicated()) {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated() && replicator.isSynchronizing()) {
largeMessagesToDelete.put(largeServerMessage.toMessage().getMessageID(), largeServerMessage);
return;
}
} finally {
readUnLock();
}
}
Runnable deleteAction = new Runnable() {
@Override
public void run() {
try {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (replicator != null) {
replicator.largeMessageDelete(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this);
}
@ -508,8 +487,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
// The confirm could only be done after the actual delete is done
confirmLargeMessage(largeServerMessage);
} finally {
readUnLock();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessage.toMessage().getMessageID());
@ -542,8 +519,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override
public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
if (isReplicated()) {
replicator.largeMessageBegin(id);
}
@ -553,8 +529,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
largeMessage.moveHeadersAndProperties(message);
return largeMessageCreated(id, largeMessage);
} finally {
readUnLock();
}
}
@ -865,8 +839,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final ActiveMQBuffer bytes) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
file.position(file.size());
if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) {
final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
@ -883,8 +856,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bytes.readBytes(bytesCopy);
addBytesToLargeMessage(file, messageId, bytesCopy);
}
} finally {
readUnLock();
}
}
@ -892,8 +863,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
public final void addBytesToLargeMessage(final SequentialFile file,
final long messageId,
final byte[] bytes) throws Exception {
readLock();
try {
try (ArtemisCloseable lock = closeableReadLock()) {
file.position(file.size());
//that's an additional precaution to avoid ByteBuffer to be pooled:
//NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident
@ -903,8 +873,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
if (isReplicated()) {
replicator.largeMessageWrite(messageId, bytes);
}
} finally {
readUnLock();
}
}

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
public class NullStorageManager implements StorageManager {
@ -78,6 +79,13 @@ public class NullStorageManager implements StorageManager {
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
private static final ArtemisCloseable dummy = () -> { };
@Override
public ArtemisCloseable closeableReadLock() {
return dummy;
}
public NullStorageManager() {
this(new IOCriticalErrorListener() {
@Override
@ -682,16 +690,6 @@ public class NullStorageManager implements StorageManager {
// no-op
}
@Override
public void readLock() {
// no-op
}
@Override
public void readUnLock() {
// no-op
}
@Override
public void persistIdGenerator() {
// no-op

View File

@ -100,6 +100,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.BooleanUtil;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -1087,9 +1088,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addHead(final MessageReference ref, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
if (ringSize != -1) {
enforceRing(ref, scheduling, true);
}
@ -1103,8 +1103,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
directDeliver = false;
}
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}
@ -1112,9 +1110,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final MessageReference ref, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
if (ringSize != -1) {
enforceRing(ref, false, true);
}
@ -1127,8 +1124,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
directDeliver = false;
}
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}
@ -1136,9 +1131,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addHead(final List<MessageReference> refs, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
for (MessageReference ref : refs) {
addHead(ref, scheduling);
}
@ -1146,8 +1140,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
resetAllIterators();
deliverAsync();
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}
@ -1155,9 +1147,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final List<MessageReference> refs, boolean scheduling) {
enterCritical(CRITICAL_PATH_ADD_HEAD);
synchronized (this) {
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
for (MessageReference ref : refs) {
addSorted(ref, scheduling);
}
@ -1165,8 +1156,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
resetAllIterators();
deliverAsync();
} finally {
leaveCritical(CRITICAL_PATH_ADD_HEAD);
}
}
}
@ -1192,8 +1181,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void addTail(final MessageReference ref, final boolean direct) {
enterCritical(CRITICAL_PATH_ADD_TAIL);
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_TAIL)) {
if (scheduleIfPossible(ref)) {
return;
}
@ -1240,8 +1228,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// Delivery async will both poll for intermediate reference and deliver to clients
deliverAsync();
} finally {
leaveCritical(CRITICAL_PATH_ADD_TAIL);
}
}
@ -1405,8 +1391,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.debug(this + " adding consumer " + consumer);
}
enterCritical(CRITICAL_CONSUMER);
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
@ -1442,10 +1427,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
} finally {
leaveCritical(CRITICAL_CONSUMER);
}
}
@Override
@ -1461,8 +1443,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void removeConsumer(final Consumer consumer) {
enterCritical(CRITICAL_CONSUMER);
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
boolean consumerRemoved = false;
@ -1498,8 +1479,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
} finally {
leaveCritical(CRITICAL_CONSUMER);
}
}
@ -4131,25 +4110,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// this will avoid that possibility
// We will be using the deliverRunner instance as the guard object to avoid multiple threads executing
// an asynchronous delivery
enterCritical(CRITICAL_DELIVER);
boolean needCheckDepage = false;
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_DELIVER)) {
deliverLock.lock();
try {
needCheckDepage = deliver();
} finally {
deliverLock.unlock();
}
} finally {
leaveCritical(CRITICAL_DELIVER);
}
if (needCheckDepage) {
enterCritical(CRITICAL_CHECK_DEPAGE);
try {
try (ArtemisCloseable metric = measureCritical(CRITICAL_CHECK_DEPAGE)) {
checkDepage(true);
} finally {
leaveCritical(CRITICAL_CHECK_DEPAGE);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.jboss.logging.Logger;
public class TransactionImpl implements Transaction {
@ -190,8 +191,7 @@ public class TransactionImpl implements Transaction {
if (logger.isTraceEnabled()) {
logger.trace("TransactionImpl::prepare::" + this);
}
storageManager.readLock();
try {
try (ArtemisCloseable lock = storageManager.closeableReadLock()) {
synchronized (timeoutLock) {
if (isEffective()) {
logger.debug("TransactionImpl::prepare::" + this + " is being ignored");
@ -239,8 +239,6 @@ public class TransactionImpl implements Transaction {
}
});
}
} finally {
storageManager.readUnLock();
}
}

View File

@ -66,6 +66,7 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
@ -216,6 +217,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public ArtemisCloseable closeableReadLock() {
return () -> { };
}
@Override
public void criticalError(Throwable error) {
error.printStackTrace();
@ -745,16 +751,6 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public void readLock() {
}
@Override
public void readUnLock() {
}
@Override
public void persistIdGenerator() {

View File

@ -85,6 +85,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
@ -838,13 +839,8 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
public void readLock() {
manager.readLock();
}
@Override
public void readUnLock() {
manager.readUnLock();
public ArtemisCloseable closeableReadLock() {
return manager.closeableReadLock();
}
@Override

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.junit.Assert;
import org.junit.Test;
@ -102,19 +103,26 @@ public class CriticalCrashTest extends SpawnedTestBase {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
public void readLock() {
super.readLock();
public ArtemisCloseable closeableReadLock() {
ArtemisCloseable measure = measureCritical(CRITICAL_STORE);
storageManagerLock.readLock().lock();
if (blocked.get()) {
while (true) {
try {
Thread.sleep(1000);
} catch (Throwable ignored) {
}
}
}
return () -> {
storageManagerLock.readLock().unlock();
measure.close();
};
}
@Override
public void storeMessage(Message message) throws Exception {
super.storeMessage(message);

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.tests.integration.critical;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.Configuration;
@ -28,6 +29,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
import org.junit.Assert;
import org.junit.Test;
@ -53,6 +55,17 @@ public class CriticalSimpleTest extends ActiveMQTestBase {
});
server.getCriticalAnalyzer().add(new CriticalComponent() {
@Override
public CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
@Override
public CriticalCloseable measureCritical(int path) {
return null;
}
@Override
public boolean checkExpiration(long timeout, boolean reset) {
return true;
@ -82,6 +95,17 @@ public class CriticalSimpleTest extends ActiveMQTestBase {
try {
server.getCriticalAnalyzer().add(new CriticalComponent() {
@Override
public CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
@Override
public CriticalCloseable measureCritical(int path) {
return null;
}
@Override
public boolean checkExpiration(long timeout, boolean reset) {
return true;