diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ArtemisCloseable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ArtemisCloseable.java new file mode 100644 index 0000000000..e464237a34 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ArtemisCloseable.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +public interface ArtemisCloseable extends AutoCloseable { + + /** The main purpose of this interface is to hide the exception since it is not needed. */ + @Override + void close(); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalCloseable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalCloseable.java new file mode 100644 index 0000000000..e84182cb57 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalCloseable.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.critical; + +import org.apache.activemq.artemis.utils.ArtemisCloseable; + +public interface CriticalCloseable extends ArtemisCloseable { + + + /** This will set something to be called right before closing. + * + * The use case that drove this call was a ReadWriteLock on the journal. + * Imagine that you need to call enterCritical, readWrite.lock() and then unlock and leaveCritical. + * By using this call I could reuse the same instance on the readWriteLock. */ + void beforeClose(ArtemisCloseable otherCloseable); +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java index 39d46141b6..b537e2cd35 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.artemis.utils.critical; - /** * A Critical component enters and leaves a critical state. * You update a long every time you enter a critical path @@ -27,26 +26,9 @@ package org.apache.activemq.artemis.utils.critical; */ public interface CriticalComponent { - default CriticalAnalyzer getCriticalAnalyzer() { - return null; - } - /** - * please save the time you entered here. - * Use volatile variables. - * No locks through anywhere. - */ - default void enterCritical(int path) { - // I'm providing a default as some components may chose to calculate it themselves - } + CriticalAnalyzer getCriticalAnalyzer(); - /** - * please save the time you entered here - * Use volatile variables. - * No locks through anywhere. - */ - default void leaveCritical(int path) { - // I'm providing a default as some components may chose to calculate it themselves - } + CriticalCloseable measureCritical(int path); /** * Check if the component is expired at a given timeout.. on any of its paths. diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java index 528e8e8de4..07614c7c1e 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java @@ -47,17 +47,11 @@ public class CriticalComponentImpl implements CriticalComponent { } @Override - public void enterCritical(int path) { - if (analyzer.isMeasuring()) { - measures[path].enterCritical(); - } - - } - - @Override - public void leaveCritical(int path) { - if (analyzer.isMeasuring()) { - measures[path].leaveCritical(); + public CriticalCloseable measureCritical(int path) { + if (!analyzer.isMeasuring()) { + return CriticalMeasure.dummyCloseable; + } else { + return measures[path].measure(); } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java index a8448bb0a2..e0df253cdf 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java @@ -17,24 +17,58 @@ package org.apache.activemq.artemis.utils.critical; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.jboss.logging.Logger; public class CriticalMeasure { + public static boolean isDummy(ArtemisCloseable closeable) { + return closeable == dummyCloseable; + } + private static final Logger logger = Logger.getLogger(CriticalMeasure.class); // this is used on enterCritical, if the logger is in trace mode private volatile Exception traceEnter; + static final AtomicIntegerFieldUpdater CURRENT_MEASURING = AtomicIntegerFieldUpdater.newUpdater(CriticalMeasure.class, "measuring"); - static final AtomicReferenceFieldUpdater CURRENT_THREAD_UDPATER = AtomicReferenceFieldUpdater.newUpdater(CriticalMeasure.class, Thread.class, "currentThread"); + private final CriticalCloseable autoCloseable = new CriticalCloseable() { + ArtemisCloseable beforeClose; - // While resetting the leaveMethod, I want to make sure no enter call would reset the value. - // so I set the Current Thread to this Ghost Thread, to then set it back to null - private static final Thread GHOST_THREAD = new Thread(); + @Override + public void beforeClose(ArtemisCloseable closeable) { + beforeClose = closeable; + } - private volatile Thread currentThread; + @Override + public void close() { + try { + if (beforeClose != null) { + beforeClose.close(); + beforeClose = null; + } + } finally { + leaveCritical(); + CURRENT_MEASURING.set(CriticalMeasure.this, 0); + } + } + }; + + protected static final CriticalCloseable dummyCloseable = new CriticalCloseable() { + @Override + public void beforeClose(ArtemisCloseable runnable) { + throw new IllegalStateException("The dummy closeable does not support beforeClose. Check before CriticalMeasure.isDummy(closeable) before you call beforeClose(runnable)"); + } + + @Override + public void close() { + } + }; + + // this is working like a boolean, although using AtomicIntegerFieldUpdater instead + protected volatile int measuring; protected volatile long timeEnter; private final int id; @@ -46,45 +80,43 @@ public class CriticalMeasure { this.timeEnter = 0; } - public void enterCritical() { - - // a sampling of a single thread at a time will be sufficient for the analyser, - // typically what causes one thread to stall will repeat on another - if (CURRENT_THREAD_UDPATER.compareAndSet(this, null, Thread.currentThread())) { - timeEnter = System.nanoTime(); - - if (logger.isTraceEnabled()) { - traceEnter = new Exception("entered"); - } + public CriticalCloseable measure() { + // I could have chosen to simply store the time on this value, however I would be calling nanoTime a lot of times + // to just waste the value + // So, I keep a measuring atomic to protect the thread sampling, + // and I will still do the set using a single thread. + if (CURRENT_MEASURING.compareAndSet(this, 0, 1)) { + enterCritical(); + return autoCloseable; + } else { + return dummyCloseable; } } - public void leaveCritical() { + protected void enterCritical() { + timeEnter = System.nanoTime(); - if (CURRENT_THREAD_UDPATER.compareAndSet(this, Thread.currentThread(), GHOST_THREAD)) { - // NULL_THREAD here represents a state where I would be ignoring any call to enterCritical or leaveCritical, while I reset the Time Enter Update - // This is to avoid replacing time Enter by a new Value, right after current Thread is set to Null. - // So we set to this ghost value while we are setting - - if (logger.isTraceEnabled()) { - - CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null; - if (analyzer != null) { - long nanoTimeout = analyzer.getTimeoutNanoSeconds(); - if (checkExpiration(nanoTimeout, false)) { - logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left")); - logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter); - } - } - traceEnter = null; - } - this.timeEnter = 0; - - // I am pretty sure this is single threaded by now.. I don't need compareAndSet here - CURRENT_THREAD_UDPATER.set(this, null); + if (logger.isTraceEnabled()) { + traceEnter = new Exception("entered"); } } + protected void leaveCritical() { + if (logger.isTraceEnabled()) { + + CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null; + if (analyzer != null) { + long nanoTimeout = analyzer.getTimeoutNanoSeconds(); + if (checkExpiration(nanoTimeout, false)) { + logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left")); + logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter); + } + } + traceEnter = null; + } + timeEnter = 0L; + } + protected String getComponentName() { if (component == null) { return "null"; @@ -94,8 +126,8 @@ public class CriticalMeasure { } public boolean checkExpiration(long timeout, boolean reset) { - final long timeEnter = this.timeEnter; - if (timeEnter != 0L) { + final long thisTimeEnter = this.timeEnter; + if (thisTimeEnter != 0L) { long time = System.nanoTime(); boolean expired = time - timeEnter > timeout; diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java index a20f55a4db..01a5668cd3 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.utils.critical; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ThreadLeakCheckRule; import org.junit.After; @@ -41,10 +43,75 @@ public class CriticalAnalyzerTest { } } + @Test + public void testDummy() { + + analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS); + CriticalComponent component = new CriticalComponentImpl(analyzer, 2); + analyzer.add(component); + + CriticalCloseable closeable1 = component.measureCritical(0); + + Assert.assertFalse(CriticalMeasure.isDummy(closeable1)); + + ArtemisCloseable closeable2 = component.measureCritical(0); + + Assert.assertTrue(CriticalMeasure.isDummy(closeable2)); + + closeable1.close(); + + closeable2 = component.measureCritical(0); + + Assert.assertFalse(CriticalMeasure.isDummy(closeable2)); + } + + @Test + public void testCall() { + + analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS); + CriticalComponent component = new CriticalComponentImpl(analyzer, 2); + analyzer.add(component); + + CriticalCloseable closeable = component.measureCritical(0); + Assert.assertFalse(CriticalMeasure.isDummy(closeable)); + + CriticalCloseable dummy = component.measureCritical(0); + + boolean exception = false; + try { + dummy.beforeClose(() -> System.out.println("never hapening")); + } catch (Throwable e) { + exception = true; + } + + Assert.assertTrue(exception); + + AtomicInteger value = new AtomicInteger(0); + + closeable.beforeClose(() -> value.set(1000)); + + Assert.assertEquals(0, value.get()); + + closeable.close(); + + Assert.assertEquals(1000, value.get()); + } + + @Test public void testAction() throws Exception { analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS); analyzer.add(new CriticalComponent() { + @Override + public CriticalAnalyzer getCriticalAnalyzer() { + return null; + } + + @Override + public CriticalCloseable measureCritical(int path) { + return null; + } + @Override public boolean checkExpiration(long timeout, boolean reset) { return true; @@ -77,9 +144,8 @@ public class CriticalAnalyzerTest { CriticalComponent component = new CriticalComponentImpl(analyzer, 2); analyzer.add(component); - component.enterCritical(0); - component.leaveCritical(0); - component.enterCritical(1); + component.measureCritical(0).close(); + component.measureCritical(1); analyzer.start(); @@ -93,7 +159,7 @@ public class CriticalAnalyzerTest { public void testEnterNoLeaveNoExpire() throws Exception { analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS); CriticalComponent component = new CriticalComponentImpl(analyzer, 2); - component.enterCritical(0); + component.measureCritical(0); Assert.assertFalse(component.checkExpiration(TimeUnit.MINUTES.toNanos(1), false)); analyzer.stop(); @@ -103,7 +169,7 @@ public class CriticalAnalyzerTest { public void testEnterNoLeaveExpire() throws Exception { analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS); CriticalComponent component = new CriticalComponentImpl(analyzer, 2); - component.enterCritical(0); + component.measureCritical(0); Thread.sleep(50); Assert.assertTrue(component.checkExpiration(0, false)); analyzer.stop(); @@ -123,8 +189,7 @@ public class CriticalAnalyzerTest { CriticalComponent component = new CriticalComponentImpl(analyzer, 1); analyzer.add(component); - component.enterCritical(0); - component.leaveCritical(0); + component.measureCritical(0).close(); analyzer.start(); @@ -146,14 +211,14 @@ public class CriticalAnalyzerTest { CriticalComponent component = new CriticalComponentImpl(analyzer, 1); analyzer.add(component); - component.enterCritical(0); + AutoCloseable measure = component.measureCritical(0); Thread.sleep(50); analyzer.start(); Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS)); - component.leaveCritical(0); + measure.close(); latch.setCount(1); diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java index ca8e2b8552..466e91e2b5 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils.critical; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import org.junit.Assert; import org.junit.Test; @@ -38,7 +39,7 @@ public class CriticalMeasureTest { CriticalComponent component = new CriticalComponentImpl(analyzer, 5); CriticalMeasure measure = new CriticalMeasure(component, 1); long time = System.nanoTime(); - CriticalMeasure.CURRENT_THREAD_UDPATER.set(measure, Thread.currentThread()); + measure.enterCritical(); measure.timeEnter = time - TimeUnit.MINUTES.toNanos(30); measure.leaveCritical(); Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); @@ -50,13 +51,26 @@ public class CriticalMeasureTest { CriticalComponent component = new CriticalComponentImpl(analyzer, 5); CriticalMeasure measure = new CriticalMeasure(component, 1); long time = System.nanoTime(); - measure.enterCritical(); + AutoCloseable closeable = measure.measure(); measure.timeEnter = time - TimeUnit.MINUTES.toNanos(5); - Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); + Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); // on this call we should had a reset before // subsequent call without reset should still fail Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), true)); // previous reset should have cleared it Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); - measure.leaveCritical(); + closeable.close(); + } + + @Test + public void testWithCloseable() throws Exception { + CriticalAnalyzer analyzer = new CriticalAnalyzerImpl(); + CriticalComponent component = new CriticalComponentImpl(analyzer, 5); + CriticalMeasure measure = new CriticalMeasure(component, 1); + long time = System.nanoTime(); + try (AutoCloseable theMeasure = component.measureCritical(0)) { + LockSupport.parkNanos(1000); + Assert.assertTrue(component.checkExpiration(100, false)); + } + Assert.assertFalse(component.checkExpiration(100, false)); } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java index 5e1370cb97..4bf1bdf0fc 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java @@ -42,7 +42,7 @@ public class MultiThreadCriticalMeasureTest { ReusableLatch latch = new ReusableLatch(0); ReusableLatch latchOnMeasure = new ReusableLatch(0); try { - CriticalMeasure measure = new CriticalMeasure((t, r) -> false, 0); + CriticalMeasure measure = new CriticalMeasure(null, 0); CyclicBarrier barrier = new CyclicBarrier(THREADS + 1); @@ -56,9 +56,9 @@ public class MultiThreadCriticalMeasureTest { latch.await(); } - measure.enterCritical(); - latchOnMeasure.await(); - measure.leaveCritical(); + try (AutoCloseable closeable = measure.measure()) { + latchOnMeasure.await(); + } } } catch (Throwable e) { e.printStackTrace(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index 91f98953c3..bec7091817 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.jboss.logging.Logger; @@ -56,46 +57,30 @@ public final class TimedBuffer extends CriticalComponentImpl { private static final int MAX_CHECKS_ON_SLEEP = 20; // Attributes ---------------------------------------------------- - - private TimedBufferObserver bufferObserver; - // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread // in spinning and checking the time - and using up CPU in the process - this semaphore is used to // prevent that private final Semaphore spinLimiter = new Semaphore(1); - - private CheckTimer timerRunnable; - private final int bufferSize; - private final ActiveMQBuffer buffer; - - private int bufferLimit = 0; - - private List callbacks; - private final int timeout; - + private final boolean logRates; + private final AtomicLong bytesFlushed = new AtomicLong(0); + private final AtomicLong flushesDone = new AtomicLong(0); + private TimedBufferObserver bufferObserver; + private CheckTimer timerRunnable; + private int bufferLimit = 0; + private List callbacks; // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen private volatile boolean pendingSync = false; + // for logging write rates private Thread timerThread; - private volatile boolean started; - // We use this flag to prevent flush occurring between calling checkSize and addBytes // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer // can get in an inconsistent state private boolean delayFlush; - - // for logging write rates - - private final boolean logRates; - - private final AtomicLong bytesFlushed = new AtomicLong(0); - - private final AtomicLong flushesDone = new AtomicLong(0); - private Timer logRatesTimer; private TimerTask logRatesTimerTask; @@ -135,8 +120,7 @@ public final class TimedBuffer extends CriticalComponentImpl { } public void start() { - enterCritical(CRITICAL_PATH_START); - try { + try (ArtemisCloseable critical = measureCritical(CRITICAL_PATH_START)) { synchronized (this) { if (started) { return; @@ -163,15 +147,12 @@ public final class TimedBuffer extends CriticalComponentImpl { started = true; } - } finally { - leaveCritical(CRITICAL_PATH_START); } } public void stop() { - enterCritical(CRITICAL_PATH_STOP); Thread localTimer = null; - try { + try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_STOP)) { // add critical analyzer here.... <<<< synchronized (this) { try { @@ -210,14 +191,11 @@ public final class TimedBuffer extends CriticalComponentImpl { } } } - } finally { - leaveCritical(CRITICAL_PATH_STOP); } } public void setObserver(final TimedBufferObserver observer) { - enterCritical(CRITICAL_PATH_SET_OBSERVER); - try { + try (AutoCloseable measure = measureCritical(CRITICAL_PATH_SET_OBSERVER)) { synchronized (this) { if (bufferObserver != null) { flush(); @@ -225,8 +203,8 @@ public final class TimedBuffer extends CriticalComponentImpl { bufferObserver = observer; } - } finally { - leaveCritical(CRITICAL_PATH_SET_OBSERVER); + } catch (Exception shouldNotHappen) { + logger.debug(shouldNotHappen); } } @@ -236,8 +214,7 @@ public final class TimedBuffer extends CriticalComponentImpl { * @param sizeChecked */ public boolean checkSize(final int sizeChecked) { - enterCritical(CRITICAL_PATH_CHECK_SIZE); - try { + try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_CHECK_SIZE)) { synchronized (this) { if (!started) { throw new IllegalStateException("TimedBuffer is not started"); @@ -274,14 +251,11 @@ public final class TimedBuffer extends CriticalComponentImpl { return true; } } - } finally { - leaveCritical(CRITICAL_PATH_CHECK_SIZE); } } public void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) { - enterCritical(CRITICAL_PATH_ADD_BYTES); - try { + try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_ADD_BYTES)) { synchronized (this) { if (!started) { throw new IllegalStateException("TimedBuffer is not started"); @@ -303,14 +277,11 @@ public final class TimedBuffer extends CriticalComponentImpl { startSpin(); } } - } finally { - leaveCritical(CRITICAL_PATH_ADD_BYTES); } } public void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) { - enterCritical(CRITICAL_PATH_ADD_BYTES); - try { + try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_ADD_BYTES)) { synchronized (this) { if (!started) { throw new IllegalStateException("TimedBuffer is not started"); @@ -328,10 +299,7 @@ public final class TimedBuffer extends CriticalComponentImpl { startSpin(); } } - } finally { - leaveCritical(CRITICAL_PATH_ADD_BYTES); } - } public void flush() { @@ -344,8 +312,7 @@ public final class TimedBuffer extends CriticalComponentImpl { * @return {@code true} when are flushed any bytes, {@code false} otherwise */ public boolean flushBatch() { - enterCritical(CRITICAL_PATH_FLUSH); - try { + try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_FLUSH)) { synchronized (this) { if (!started) { throw new IllegalStateException("TimedBuffer is not started"); @@ -378,8 +345,6 @@ public final class TimedBuffer extends CriticalComponentImpl { return false; } } - } finally { - leaveCritical(CRITICAL_PATH_FLUSH); } } @@ -391,6 +356,43 @@ public final class TimedBuffer extends CriticalComponentImpl { // Inner classes ------------------------------------------------- + /** + * Sub classes (tests basically) can use this to override how the sleep is being done + * + * @param sleepNanos + */ + protected void sleep(long sleepNanos) { + LockSupport.parkNanos(sleepNanos); + } + + /** + * Sub classes (tests basically) can use this to override disabling spinning + */ + protected void stopSpin() { + if (spinning) { + try { + // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning + // when the buffer is inactive + spinLimiter.acquire(); + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + + spinning = false; + } + } + + /** + * Sub classes (tests basically) can use this to override disabling spinning + */ + protected void startSpin() { + if (!spinning) { + spinLimiter.release(); + + spinning = true; + } + } + private class LogRatesTimerTask extends TimerTask { private boolean closed; @@ -434,10 +436,9 @@ public final class TimedBuffer extends CriticalComponentImpl { private class CheckTimer implements Runnable { - private volatile boolean closed = false; - int checks = 0; int failedChecks = 0; + private volatile boolean closed = false; @Override public void run() { @@ -523,41 +524,4 @@ public final class TimedBuffer extends CriticalComponentImpl { } } - /** - * Sub classes (tests basically) can use this to override how the sleep is being done - * - * @param sleepNanos - */ - protected void sleep(long sleepNanos) { - LockSupport.parkNanos(sleepNanos); - } - - /** - * Sub classes (tests basically) can use this to override disabling spinning - */ - protected void stopSpin() { - if (spinning) { - try { - // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning - // when the buffer is inactive - spinLimiter.acquire(); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - - spinning = false; - } - } - - /** - * Sub classes (tests basically) can use this to override disabling spinning - */ - protected void startSpin() { - if (!spinning) { - spinLimiter.release(); - - spinning = true; - } - } - } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 8cbbe0d5a7..8ae57447f9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.SoftValueLongObjectHashMap; import org.apache.activemq.artemis.utils.ThreadDumpUtil; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; @@ -438,80 +439,76 @@ public class PageCursorProviderImpl implements PageCursorProvider { // // I tried to simplify the locks but each PageStore has its own lock, so this was the best option // I found in order to fix https://issues.apache.org/jira/browse/ARTEMIS-3054 - storageManager.readLock(); + try (ArtemisCloseable readLock = storageManager.closeableReadLock()) { - while (true) { - if (pagingStore.lock(100)) { - break; + while (true) { + if (pagingStore.lock(100)) { + break; + } + if (!pagingStore.isStarted()) + return; } - if (!pagingStore.isStarted()) - return; - } - logger.tracef("%s locked", this); - - synchronized (this) { - try { - if (!pagingStore.isStarted()) { - return; - } - - if (pagingStore.getNumberOfPages() == 0) { - return; - } - - ArrayList cursorList = cloneSubscriptions(); - - long minPage = checkMinPage(cursorList); - deliverIfNecessary(cursorList, minPage); - - logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage); - - // if the current page is being written... - // on that case we need to move to verify it in a different way - if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) { - boolean complete = checkPageCompletion(cursorList, minPage); + logger.tracef("%s locked", this); + synchronized (this) { + try { if (!pagingStore.isStarted()) { return; } - // All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark - if (complete) { + if (pagingStore.getNumberOfPages() == 0) { + return; + } - cleanupComplete(cursorList); - } - } + ArrayList cursorList = cloneSubscriptions(); - for (long i = pagingStore.getFirstPage(); i <= minPage; i++) { - if (!checkPageCompletion(cursorList, i)) { - break; - } - Page page = pagingStore.depage(); - if (page == null) { - break; - } - depagedPages.add(page); - } + long minPage = checkMinPage(cursorList); + deliverIfNecessary(cursorList, minPage); - if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && pagingStore.getCurrentPage().getNumberOfMessages() == 0) { - pagingStore.stopPaging(); - } else { - if (logger.isTraceEnabled()) { - logger.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() + - " as numberOfPages == " + - pagingStore.getNumberOfPages() + - " and currentPage.numberOfMessages = " + - pagingStore.getCurrentPage().getNumberOfMessages()); + logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage); + + // if the current page is being written... + // on that case we need to move to verify it in a different way + if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) { + boolean complete = checkPageCompletion(cursorList, minPage); + + if (!pagingStore.isStarted()) { + return; + } + + // All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark + if (complete) { + + cleanupComplete(cursorList); + } } + + for (long i = pagingStore.getFirstPage(); i <= minPage; i++) { + if (!checkPageCompletion(cursorList, i)) { + break; + } + Page page = pagingStore.depage(); + if (page == null) { + break; + } + depagedPages.add(page); + } + + if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && pagingStore.getCurrentPage().getNumberOfMessages() == 0) { + pagingStore.stopPaging(); + } else { + if (logger.isTraceEnabled()) { + logger.trace("Couldn't cleanup page on address " + this.pagingStore.getAddress() + " as numberOfPages == " + pagingStore.getNumberOfPages() + " and currentPage.numberOfMessages = " + pagingStore.getCurrentPage().getNumberOfMessages()); + } + } + } catch (Exception ex) { + ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(ex, pagingStore.getAddress()); + logger.warn(ex.getMessage(), ex); + return; + } finally { + pagingStore.unlock(); } - } catch (Exception ex) { - ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(ex, pagingStore.getAddress()); - logger.warn(ex.getMessage(), ex); - return; - } finally { - pagingStore.unlock(); - storageManager.readUnLock(); } } finishCleanup(depagedPages); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index f7a82e1b27..1849dd7f9d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.jboss.logging.Logger; /** @@ -254,8 +255,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { @Override public void delete(Transaction tx) throws Exception { // always lock the StorageManager first. - storage.readLock(); - try { + try (ArtemisCloseable lock = storage.closeableReadLock()) { synchronized (this) { for (Long record : incrementRecords) { storage.deleteIncrementRecord(tx.getID(), record.longValue()); @@ -271,8 +271,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { value.set(0); incrementRecords.clear(); } - } finally { - storage.readUnLock(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 48e44542c3..ca53c7c24c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.IDGenerator; /** @@ -479,15 +480,9 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * say Paging classes, that use locks of their own AND also write through the StorageManager MUST * first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock * when starting replication sync. - */ - void readLock(); - - /** - * Unlock the manager. * - * @see StorageManager#readLock() */ - void readUnLock(); + ArtemisCloseable closeableReadLock(); /** * Closes the {@link IDGenerator} persisting the current record ID. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index ae7985ef1b..06b3fe6916 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -116,12 +116,15 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap; import org.apache.activemq.artemis.utils.collections.SparseArrayLinkedList; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; +import org.apache.activemq.artemis.utils.critical.CriticalCloseable; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; +import org.apache.activemq.artemis.utils.critical.CriticalMeasure; import org.jboss.logging.Logger; /** @@ -171,6 +174,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); + // I would rather cache the Closeable instance here.. + // I never know when the JRE decides to create a new instance on every call. + // So I'm playing safe here. That's all + protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock; + protected Journal messageJournal; protected Journal bindingsJournal; @@ -340,12 +348,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { installLargeMessageConfirmationOnTX(tx, recordID); messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, messageID)); - } finally { - readUnLock(); } } @@ -354,11 +359,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp */ @Override public void confirmPendingLargeMessage(long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecord(recordID, true, getContext()); - } finally { - readUnLock(); } } @@ -369,9 +371,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); } - readLock(); - try { - // Note that we don't sync, the add reference that comes immediately after will sync if + try (ArtemisCloseable lock = closeableReadLock()) { // Note that we don't sync, the add reference that comes immediately after will sync if // appropriate if (message.isLargeMessage() && message instanceof LargeServerMessageImpl) { @@ -379,109 +379,100 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } else { messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false)); } - } finally { - readUnLock(); } } @Override public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional)); - } finally { - readUnLock(); } } @Override - public void readLock() { - enterCritical(CRITICAL_STORE); + public ArtemisCloseable closeableReadLock() { + CriticalCloseable measure = measureCritical(CRITICAL_STORE); storageManagerLock.readLock().lock(); + + if (CriticalMeasure.isDummy(measure)) { + // The next statement could have been called like this: + // return storageManagerLock.readLock()::unlock; + // However I wasn't 100% sure the JDK would take good care + // of caching for me. + // Since this is important to me here, I decided to play safe and + // cache it myself + return unlockCloseable; + } else { + // Same applies to the next statement here + // measure.beforeClose(storageManagerLock.readLock()::unlock); + // I'm just playing safe and caching it myself + measure.beforeClose(unlockCloseable); + return measure; + } } - @Override - public void readUnLock() { - storageManagerLock.readLock().unlock(); - leaveCritical(CRITICAL_STORE); - } - - /** for internal use and testsuite, don't use it outside of tests */ + /** + * for internal use and testsuite, don't use it outside of tests + */ public void writeLock() { storageManagerLock.writeLock().lock(); } - /** for internal use and testsuite, don't use it outside of tests */ + /** + * for internal use and testsuite, don't use it outside of tests + */ public void writeUnlock() { storageManagerLock.writeLock().unlock(); } @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); } } @Override public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long ackID = idGenerator.generateID(); position.setRecordID(ackID); messageJournal.appendAddRecord(ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); } } @Override public boolean deleteMessage(final long messageID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { // Messages are deleted on postACK, one after another. // If these deletes are synchronized, we would build up messages on the Executor // increasing chances of losing deletes. // The StorageManager should verify messages without references return messageJournal.tryAppendDeleteRecord(messageID, false, getContext(false)); - } finally { - readUnLock(); } } @Override public boolean updateScheduledDeliveryTime(final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); } } @Override public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); messageJournal.appendAddRecord(recordID, JournalRecordIds.DUPLICATE_ID, encoding, syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); } } @Override public void deleteDuplicateID(final long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); } } @@ -493,8 +484,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); } - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (message.isLargeMessage() && message instanceof LargeServerMessageImpl) { // this is a core large message messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message); @@ -502,19 +492,14 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message); } - } finally { - readUnLock(); } } @Override public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { pageTransaction.setRecordID(generateID()); messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, pageTransaction); - } finally { - readUnLock(); } } @@ -522,21 +507,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages)); - } finally { - readUnLock(); } } @Override public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID)); - } finally { - readUnLock(); } } @@ -544,23 +523,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID)); - } finally { - readUnLock(); } } @Override public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long ackID = idGenerator.generateID(); position.setRecordID(ackID); messageJournal.appendAddRecordTransactional(txID, ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position)); - } finally { - readUnLock(); } } @@ -578,11 +551,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecordTransactional(txID, ackID); - } finally { - readUnLock(); } } @@ -593,57 +563,40 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long id = generateID(); messageJournal.appendAddRecord(id, JournalRecordIds.HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext(true)); return id; - } finally { - readUnLock(); } } @Override public void deleteHeuristicCompletion(final long id) throws Exception { - readLock(); - try { - + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecord(id, true, getContext(true)); - } finally { - readUnLock(); } } @Override public void deletePageTransactional(final long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecord(recordID, false); - } finally { - readUnLock(); } } @Override public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); - readLock(); - try { - + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecordTransactional(txID, ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding); - } finally { - readUnLock(); } } @Override public void prepare(final long txID, final Xid xid) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional)); - } finally { - readUnLock(); } } @@ -665,8 +618,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void commit(final long txID, final boolean lineUpContext) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext); if (!lineUpContext && !syncTransactional) { if (logger.isTraceEnabled()) { @@ -681,18 +633,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp */ getContext(true).done(); } - } finally { - readUnLock(); } } @Override public void rollback(final long txID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional)); - } finally { - readUnLock(); } } @@ -703,11 +650,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final long recordID) throws Exception { DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding); - } finally { - readUnLock(); } } @@ -718,21 +662,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final long recordID) throws Exception { DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendUpdateRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding); - } finally { - readUnLock(); } } @Override public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } finally { - readUnLock(); } } @@ -749,25 +687,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp ref.setPersistedCount(ref.getDeliveryCount()); DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { return messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); } } @Override public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception { deleteAddressSetting(addressSetting.getAddressMatch()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long id = idGenerator.generateID(); addressSetting.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.ADDRESS_SETTING_RECORD, addressSetting, true); mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting); - } finally { - readUnLock(); } } @@ -785,28 +717,22 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception { deleteSecuritySetting(persistedRoles.getAddressMatch()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long id = idGenerator.generateID(); persistedRoles.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.SECURITY_SETTING_RECORD, persistedRoles, true); mapPersistedSecuritySettings.put(persistedRoles.getAddressMatch(), persistedRoles); - } finally { - readUnLock(); } } @Override public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception { deleteDivertConfiguration(persistedDivertConfiguration.getName()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long id = idGenerator.generateID(); persistedDivertConfiguration.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.DIVERT_RECORD, persistedDivertConfiguration, true); mapPersistedDivertConfigurations.put(persistedDivertConfiguration.getName(), persistedDivertConfiguration); - } finally { - readUnLock(); } } @@ -814,11 +740,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void deleteDivertConfiguration(String divertName) throws Exception { PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName); if (oldDivert != null) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(oldDivert.getStoreId(), false); - } finally { - readUnLock(); } } } @@ -831,14 +754,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeUser(PersistedUser persistedUser) throws Exception { deleteUser(persistedUser.getUsername()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long id = idGenerator.generateID(); persistedUser.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.USER_RECORD, persistedUser, true); mapPersistedUsers.put(persistedUser.getUsername(), persistedUser); - } finally { - readUnLock(); } } @@ -846,11 +766,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void deleteUser(String username) throws Exception { PersistedUser oldUser = mapPersistedUsers.remove(username); if (oldUser != null) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(oldUser.getStoreId(), false); - } finally { - readUnLock(); } } } @@ -863,14 +780,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeRole(PersistedRole persistedRole) throws Exception { deleteRole(persistedRole.getUsername()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long id = idGenerator.generateID(); persistedRole.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.ROLE_RECORD, persistedRole, true); mapPersistedRoles.put(persistedRole.getUsername(), persistedRole); - } finally { - readUnLock(); } } @@ -878,11 +792,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void deleteRole(String username) throws Exception { PersistedRole oldRole = mapPersistedRoles.remove(username); if (oldRole != null) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(oldRole.getStoreId(), false); - } finally { - readUnLock(); } } } @@ -894,21 +805,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeID(final long journalID, final long id) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendAddRecord(journalID, JournalRecordIds.ID_COUNTER_RECORD, BatchingIDGenerator.createIDEncodingSupport(id), true, getContext(true)); - } finally { - readUnLock(); } } @Override public void deleteID(long journalD) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(journalD, false); - } finally { - readUnLock(); } } @@ -916,11 +821,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void deleteAddressSetting(SimpleString addressMatch) throws Exception { PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); if (oldSetting != null) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false); - } finally { - readUnLock(); } } } @@ -929,11 +831,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void deleteSecuritySetting(SimpleString addressMatch) throws Exception { PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch); if (oldRoles != null) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false); - } finally { - readUnLock(); } } } @@ -954,9 +853,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp Set invalidPageTransactions = new HashSet<>(); Map messages = new HashMap<>(); - readLock(); - try { - + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.setRemoveExtraFilesOnLoad(true); JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this)); @@ -976,13 +873,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final MutableLong recordNumber = new MutableLong(); final CoreMessageObjectPools pools; if (totalSize > 0) { - final int addresses = (int)Math.max( - DEFAULT_POOL_CAPACITY, - queueInfos == null ? 0 : - queueInfos.values().stream() - .map(QueueBindingInfo::getAddress) - .filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH) - .count() * 2); + final int addresses = (int) Math.max(DEFAULT_POOL_CAPACITY, queueInfos == null ? 0 : queueInfos.values().stream().map(QueueBindingInfo::getAddress).filter(addr -> addr.length() <= DEFAULT_MAX_LENGTH).count() * 2); pools = new CoreMessageObjectPools(addresses, DEFAULT_POOL_CAPACITY, 128, 128); } else { pools = null; @@ -1328,8 +1219,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp journalLoaded = true; return info; - } finally { - readUnLock(); } } @@ -1380,21 +1269,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void addGrouping(final GroupBinding groupBinding) throws Exception { GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendAddRecord(groupBinding.getId(), JournalRecordIds.GROUP_RECORD, groupingEncoding, true); - } finally { - readUnLock(); } } @Override public void deleteGrouping(long tx, final GroupBinding groupBinding) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecordTransactional(tx, groupBinding.getId()); - } finally { - readUnLock(); } } @@ -1419,25 +1302,19 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isEnabled(), queue.isExclusive(), queue.isGroupRebalance(), queue.isGroupRebalancePauseDispatch(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDelete(), queue.getAutoDeleteDelay(), queue.getAutoDeleteMessageCount(), queue.getRoutingType().getType(), queue.isConfigurationManaged(), queue.getRingSize()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (update) { bindingsJournal.appendUpdateRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); } else { bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); } - } finally { - readUnLock(); } } @Override public void deleteQueueBinding(long tx, final long queueBindingID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecordTransactional(tx, queueBindingID); - } finally { - readUnLock(); } } @@ -1445,24 +1322,17 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exception { long recordID = idGenerator.generateID(); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendAddRecord(recordID, JournalRecordIds.QUEUE_STATUS_RECORD, new QueueStatusEncoding(queueID, status), true); - } finally { - readUnLock(); } - return recordID; } @Override public void deleteQueueStatus(long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(recordID, true); - } finally { - readUnLock(); } } @@ -1470,132 +1340,96 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public long storeAddressStatus(long addressID, AddressQueueStatus status) throws Exception { long recordID = idGenerator.generateID(); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendAddRecord(recordID, JournalRecordIds.ADDRESS_STATUS_RECORD, new AddressStatusEncoding(addressID, status), true); - } finally { - readUnLock(); } - return recordID; } @Override public void deleteAddressStatus(long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecord(recordID, true); - } finally { - readUnLock(); } } @Override public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { - PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), - addressInfo.getRoutingTypes(), - addressInfo.isAutoCreated()); + PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingTypes(), addressInfo.isAutoCreated()); - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long recordID = idGenerator.generateID(); bindingEncoding.setId(recordID); addressInfo.setId(recordID); bindingsJournal.appendAddRecordTransactional(tx, recordID, JournalRecordIds.ADDRESS_BINDING_RECORD, bindingEncoding); - } finally { - readUnLock(); } } @Override public void deleteAddressBinding(long tx, final long addressBindingID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { bindingsJournal.appendDeleteRecordTransactional(tx, addressBindingID); - } finally { - readUnLock(); } } @Override public long storePageCounterInc(long txID, long queueID, int value, long persistentSize) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long recordID = idGenerator.generateID(); messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize)); return recordID; - } finally { - readUnLock(); } } @Override public long storePageCounterInc(long queueID, int value, long persistentSize) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long recordID = idGenerator.generateID(); messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize), true, getContext()); return recordID; - } finally { - readUnLock(); } } @Override public long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long recordID = idGenerator.generateID(); messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value, persistentSize)); return recordID; - } finally { - readUnLock(); } } @Override public long storePendingCounter(final long queueID, final long pageID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { final long recordID = idGenerator.generateID(); PageCountPendingImpl pendingInc = new PageCountPendingImpl(queueID, pageID); // We must guarantee the record sync before we actually write on the page otherwise we may get out of sync // on the counter messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER, pendingInc, true); return recordID; - } finally { - readUnLock(); } } @Override public void deleteIncrementRecord(long txID, long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } finally { - readUnLock(); } } @Override public void deletePageCounter(long txID, long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } finally { - readUnLock(); } } @Override public void deletePendingPageCounter(long txID, long recordID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } finally { - readUnLock(); } } @@ -1694,11 +1528,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void lineUpContext() { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.lineUpContext(getContext()); - } finally { - readUnLock(); } } @@ -1787,15 +1618,12 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp * TODO: Is this still being used ? */ public JournalLoadInformation[] loadInternalOnly() throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { JournalLoadInformation[] info = new JournalLoadInformation[2]; info[0] = bindingsJournal.loadInternalOnly(); info[1] = messageJournal.loadInternalOnly(); return info; - } finally { - readUnLock(); } } @@ -2171,6 +1999,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp persistedRole.setStoreId(id); return persistedRole; } + /** * @param id * @param buffer @@ -2221,10 +2050,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public boolean addToPage(PagingStore store, - Message msg, - Transaction tx, - RouteContextList listCtx) throws Exception { + public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception { /** * Exposing the read-lock here is an encapsulation violation done in order to keep the code * simpler. The alternative would be to add a second method, say 'verifyPaging', to diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index c01b411ba9..8f9ad41a49 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; @@ -253,14 +254,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override public void stop(boolean ioCriticalError, boolean sendFailover) throws Exception { - try { - enterCritical(CRITICAL_STOP); + try (ArtemisCloseable critical = measureCritical(CRITICAL_STOP)) { synchronized (this) { if (internalStop(ioCriticalError, sendFailover)) return; } - } finally { - leaveCritical(CRITICAL_STOP); } } @@ -290,39 +288,39 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // that's ok } - enterCritical(CRITICAL_STOP_2); - storageManagerLock.writeLock().lock(); - try { + try (ArtemisCloseable critical = measureCritical(CRITICAL_STOP_2)) { + storageManagerLock.writeLock().lock(); + try { - // We cache the variable as the replicator could be changed between here and the time we call stop - // since sendLiveIsStopping may issue a close back from the channel - // and we want to ensure a stop here just in case - ReplicationManager replicatorInUse = replicator; - if (replicatorInUse != null) { - if (sendFailover) { - final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER); - if (token != null) { - try { - token.waitCompletion(5000); - } catch (Exception e) { - // ignore it + // We cache the variable as the replicator could be changed between here and the time we call stop + // since sendLiveIsStopping may issue a close back from the channel + // and we want to ensure a stop here just in case + ReplicationManager replicatorInUse = replicator; + if (replicatorInUse != null) { + if (sendFailover) { + final OperationContext token = replicator.sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER); + if (token != null) { + try { + token.waitCompletion(5000); + } catch (Exception e) { + // ignore it + } } } + // we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown + // while the backup will never receive then + replicatorInUse.stop(false); } - // we cannot clear replication tokens, otherwise clients will eventually be informed of completion during a server's shutdown - // while the backup will never receive then - replicatorInUse.stop(false); + bindingsJournal.stop(); + + messageJournal.stop(); + + journalLoaded = false; + + started = false; + } finally { + storageManagerLock.writeLock().unlock(); } - bindingsJournal.stop(); - - messageJournal.stop(); - - journalLoaded = false; - - started = false; - } finally { - storageManagerLock.writeLock().unlock(); - leaveCritical(CRITICAL_STOP_2); } return false; } @@ -388,12 +386,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override public void pageClosed(final SimpleString storeName, final int pageNumber) { if (isReplicated()) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (isReplicated()) replicator.pageClosed(storeName, pageNumber); - } finally { - readUnLock(); } } } @@ -401,12 +396,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override public void pageDeleted(final SimpleString storeName, final int pageNumber) { if (isReplicated()) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (isReplicated()) replicator.pageDeleted(storeName, pageNumber); - } finally { - readUnLock(); } } } @@ -420,12 +412,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // Say you are sending durable and non-durable messages to a page // The ACKs would be done to wrong positions, and the backup would be a mess - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (isReplicated()) replicator.pageWrite(message, pageNumber); - } finally { - readUnLock(); } } } @@ -441,26 +430,20 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } public long storePendingLargeMessage(final long messageID) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { long recordID = generateID(); messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true)); return recordID; - } finally { - readUnLock(); } } @Override public void largeMessageClosed(LargeServerMessage largeServerMessage) throws ActiveMQException { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (isReplicated()) { replicator.largeMessageClosed(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this); } - } finally { - readUnLock(); } } @@ -485,22 +468,18 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } if (largeServerMessage.toMessage().isDurable() && isReplicated()) { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (isReplicated() && replicator.isSynchronizing()) { largeMessagesToDelete.put(largeServerMessage.toMessage().getMessageID(), largeServerMessage); return; } - } finally { - readUnLock(); } } Runnable deleteAction = new Runnable() { @Override public void run() { try { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (replicator != null) { replicator.largeMessageDelete(largeServerMessage.toMessage().getMessageID(), JournalStorageManager.this); } @@ -508,8 +487,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // The confirm could only be done after the actual delete is done confirmLargeMessage(largeServerMessage); - } finally { - readUnLock(); } } catch (Exception e) { ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessage.toMessage().getMessageID()); @@ -542,8 +519,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { if (isReplicated()) { replicator.largeMessageBegin(id); } @@ -553,8 +529,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { largeMessage.moveHeadersAndProperties(message); return largeMessageCreated(id, largeMessage); - } finally { - readUnLock(); } } @@ -865,8 +839,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { public final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final ActiveMQBuffer bytes) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { file.position(file.size()); if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) { final ByteBuffer nioBytes = bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); @@ -883,8 +856,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { bytes.readBytes(bytesCopy); addBytesToLargeMessage(file, messageId, bytesCopy); } - } finally { - readUnLock(); } } @@ -892,8 +863,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { public final void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception { - readLock(); - try { + try (ArtemisCloseable lock = closeableReadLock()) { file.position(file.size()); //that's an additional precaution to avoid ByteBuffer to be pooled: //NIOSequentialFileFactory doesn't pool heap ByteBuffer, but better to make evident @@ -903,8 +873,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (isReplicated()) { replicator.largeMessageWrite(messageId, bytes); } - } finally { - readUnLock(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 8375060347..3c3824caaf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.utils.ArtemisCloseable; public class NullStorageManager implements StorageManager { @@ -78,6 +79,13 @@ public class NullStorageManager implements StorageManager { this.ioCriticalErrorListener = ioCriticalErrorListener; } + private static final ArtemisCloseable dummy = () -> { }; + + @Override + public ArtemisCloseable closeableReadLock() { + return dummy; + } + public NullStorageManager() { this(new IOCriticalErrorListener() { @Override @@ -682,16 +690,6 @@ public class NullStorageManager implements StorageManager { // no-op } - @Override - public void readLock() { - // no-op - } - - @Override - public void readUnLock() { - // no-op - } - @Override public void persistIdGenerator() { // no-op diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 53eaafbfe2..d9a93a6ee8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -100,6 +100,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.BooleanUtil; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReferenceCounter; @@ -1087,9 +1088,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addHead(final MessageReference ref, boolean scheduling) { - enterCritical(CRITICAL_PATH_ADD_HEAD); - synchronized (this) { - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { + synchronized (this) { if (ringSize != -1) { enforceRing(ref, scheduling, true); } @@ -1103,8 +1103,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { directDeliver = false; } - } finally { - leaveCritical(CRITICAL_PATH_ADD_HEAD); } } } @@ -1112,9 +1110,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addSorted(final MessageReference ref, boolean scheduling) { - enterCritical(CRITICAL_PATH_ADD_HEAD); - synchronized (this) { - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { + synchronized (this) { if (ringSize != -1) { enforceRing(ref, false, true); } @@ -1127,8 +1124,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { directDeliver = false; } - } finally { - leaveCritical(CRITICAL_PATH_ADD_HEAD); } } } @@ -1136,9 +1131,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addHead(final List refs, boolean scheduling) { - enterCritical(CRITICAL_PATH_ADD_HEAD); - synchronized (this) { - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { + synchronized (this) { for (MessageReference ref : refs) { addHead(ref, scheduling); } @@ -1146,8 +1140,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { resetAllIterators(); deliverAsync(); - } finally { - leaveCritical(CRITICAL_PATH_ADD_HEAD); } } } @@ -1155,9 +1147,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addSorted(final List refs, boolean scheduling) { - enterCritical(CRITICAL_PATH_ADD_HEAD); - synchronized (this) { - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { + synchronized (this) { for (MessageReference ref : refs) { addSorted(ref, scheduling); } @@ -1165,8 +1156,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { resetAllIterators(); deliverAsync(); - } finally { - leaveCritical(CRITICAL_PATH_ADD_HEAD); } } } @@ -1192,8 +1181,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void addTail(final MessageReference ref, final boolean direct) { - enterCritical(CRITICAL_PATH_ADD_TAIL); - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_TAIL)) { if (scheduleIfPossible(ref)) { return; } @@ -1240,8 +1228,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // Delivery async will both poll for intermediate reference and deliver to clients deliverAsync(); - } finally { - leaveCritical(CRITICAL_PATH_ADD_TAIL); } } @@ -1405,8 +1391,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.debug(this + " adding consumer " + consumer); } - enterCritical(CRITICAL_CONSUMER); - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) { synchronized (this) { if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); @@ -1442,10 +1427,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - } finally { - leaveCritical(CRITICAL_CONSUMER); } - } @Override @@ -1461,8 +1443,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void removeConsumer(final Consumer consumer) { - enterCritical(CRITICAL_CONSUMER); - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) { synchronized (this) { boolean consumerRemoved = false; @@ -1498,8 +1479,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - } finally { - leaveCritical(CRITICAL_CONSUMER); } } @@ -4131,25 +4110,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // this will avoid that possibility // We will be using the deliverRunner instance as the guard object to avoid multiple threads executing // an asynchronous delivery - enterCritical(CRITICAL_DELIVER); boolean needCheckDepage = false; - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_DELIVER)) { deliverLock.lock(); try { needCheckDepage = deliver(); } finally { deliverLock.unlock(); } - } finally { - leaveCritical(CRITICAL_DELIVER); } if (needCheckDepage) { - enterCritical(CRITICAL_CHECK_DEPAGE); - try { + try (ArtemisCloseable metric = measureCritical(CRITICAL_CHECK_DEPAGE)) { checkDepage(true); - } finally { - leaveCritical(CRITICAL_CHECK_DEPAGE); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index e14d31ddf9..d35c2a89c2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.RefsOperation; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.jboss.logging.Logger; public class TransactionImpl implements Transaction { @@ -190,8 +191,7 @@ public class TransactionImpl implements Transaction { if (logger.isTraceEnabled()) { logger.trace("TransactionImpl::prepare::" + this); } - storageManager.readLock(); - try { + try (ArtemisCloseable lock = storageManager.closeableReadLock()) { synchronized (timeoutLock) { if (isEffective()) { logger.debug("TransactionImpl::prepare::" + this + " is being ignored"); @@ -239,8 +239,6 @@ public class TransactionImpl implements Transaction { } }); } - } finally { - storageManager.readUnLock(); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 2deefdf9e0..bd1979b28b 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -66,6 +66,7 @@ import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Test; @@ -216,6 +217,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } + @Override + public ArtemisCloseable closeableReadLock() { + return () -> { }; + } + @Override public void criticalError(Throwable error) { error.printStackTrace(); @@ -745,16 +751,6 @@ public class TransactionImplTest extends ActiveMQTestBase { } - @Override - public void readLock() { - - } - - @Override - public void readUnLock() { - - } - @Override public void persistIdGenerator() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index d16067a97f..64a4c651a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -85,6 +85,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.util.SpawnedTestBase; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.After; @@ -838,13 +839,8 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override - public void readLock() { - manager.readLock(); - } - - @Override - public void readUnLock() { - manager.readUnLock(); + public ArtemisCloseable closeableReadLock() { + return manager.closeableReadLock(); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java index 979954c53d..54eb0ab5b1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule; import org.apache.activemq.artemis.tests.util.SpawnedTestBase; +import org.apache.activemq.artemis.utils.ArtemisCloseable; import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.junit.Assert; import org.junit.Test; @@ -102,19 +103,26 @@ public class CriticalCrashTest extends SpawnedTestBase { JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) { @Override - public void readLock() { - super.readLock(); + public ArtemisCloseable closeableReadLock() { + ArtemisCloseable measure = measureCritical(CRITICAL_STORE); + storageManagerLock.readLock().lock(); + if (blocked.get()) { while (true) { try { Thread.sleep(1000); } catch (Throwable ignored) { - } } } + + return () -> { + storageManagerLock.readLock().unlock(); + measure.close(); + }; } + @Override public void storeMessage(Message message) throws Exception { super.storeMessage(message); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java index d9b28e6ac0..d0b022fc0c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalSimpleTest.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.tests.integration.critical; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.config.Configuration; @@ -28,6 +29,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.critical.CriticalCloseable; import org.apache.activemq.artemis.utils.critical.CriticalComponent; import org.junit.Assert; import org.junit.Test; @@ -53,6 +55,17 @@ public class CriticalSimpleTest extends ActiveMQTestBase { }); server.getCriticalAnalyzer().add(new CriticalComponent() { + + @Override + public CriticalAnalyzer getCriticalAnalyzer() { + return null; + } + + @Override + public CriticalCloseable measureCritical(int path) { + return null; + } + @Override public boolean checkExpiration(long timeout, boolean reset) { return true; @@ -82,6 +95,17 @@ public class CriticalSimpleTest extends ActiveMQTestBase { try { server.getCriticalAnalyzer().add(new CriticalComponent() { + + @Override + public CriticalAnalyzer getCriticalAnalyzer() { + return null; + } + + @Override + public CriticalCloseable measureCritical(int path) { + return null; + } + @Override public boolean checkExpiration(long timeout, boolean reset) { return true;