ARTEMIS-3271 Improve Critical Analyzer to sample single therads
worked in collaboration with Gary Tully on this fix
This commit is contained in:
parent
42405fedcf
commit
39f468450b
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.activemq.artemis.utils.critical;
|
package org.apache.activemq.artemis.utils.critical;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||||
|
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||||
|
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
@ -30,10 +31,15 @@ public class CriticalMeasure {
|
||||||
|
|
||||||
//uses updaters to avoid creates many AtomicLong instances
|
//uses updaters to avoid creates many AtomicLong instances
|
||||||
static final AtomicLongFieldUpdater<CriticalMeasure> TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter");
|
static final AtomicLongFieldUpdater<CriticalMeasure> TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter");
|
||||||
static final AtomicLongFieldUpdater<CriticalMeasure> TIME_LEFT_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeLeft");
|
|
||||||
|
|
||||||
|
static final AtomicReferenceFieldUpdater<CriticalMeasure, Thread> CURRENT_THREAD_UDPATER = AtomicReferenceFieldUpdater.newUpdater(CriticalMeasure.class, Thread.class, "currentThread");
|
||||||
|
|
||||||
|
// While reseting the leaveMethod, I want to make sure no enter call would reset the value.
|
||||||
|
// so I set the Current Thread to this Ghost Thread, to then set it back to null
|
||||||
|
private static final Thread GHOST_THREAD = new Thread();
|
||||||
|
|
||||||
|
private volatile Thread currentThread;
|
||||||
private volatile long timeEnter;
|
private volatile long timeEnter;
|
||||||
private volatile long timeLeft;
|
|
||||||
|
|
||||||
private final int id;
|
private final int id;
|
||||||
private final CriticalComponent component;
|
private final CriticalComponent component;
|
||||||
|
@ -44,35 +50,49 @@ public class CriticalMeasure {
|
||||||
//prefer this approach instead of using some fixed value because System::nanoTime could change sign
|
//prefer this approach instead of using some fixed value because System::nanoTime could change sign
|
||||||
//with long running processes
|
//with long running processes
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
TIME_LEFT_UPDATER.set(this, time);
|
TIME_ENTER_UPDATER.set(this, 0);
|
||||||
TIME_ENTER_UPDATER.set(this, time);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void enterCritical() {
|
public void enterCritical() {
|
||||||
//prefer lazySet in order to avoid heavy-weight full barriers on x86
|
|
||||||
TIME_ENTER_UPDATER.lazySet(this, System.nanoTime());
|
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
// We should only measure a single thread from all of the threads.
|
||||||
traceEnter = new Exception("entered");
|
// We are fine to ignore any other calls made to this component, we only need one thread measured
|
||||||
|
if (CURRENT_THREAD_UDPATER.compareAndSet(this, null, Thread.currentThread())) {
|
||||||
|
//prefer lazySet in order to avoid heavy-weight full barriers on x86
|
||||||
|
TIME_ENTER_UPDATER.lazySet(this, System.nanoTime());
|
||||||
|
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
traceEnter = new Exception("entered");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void leaveCritical() {
|
public void leaveCritical() {
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
// We should only measure a single thread from all of the threads.
|
||||||
|
// We are fine to ignore any other calls made to this component, we only need one thread measured
|
||||||
|
if (CURRENT_THREAD_UDPATER.compareAndSet(this, Thread.currentThread(), GHOST_THREAD)) {
|
||||||
|
// NULL_THREAD here represents a state where I would be ignoring any call to enterCritical or levalCritical, while I reset the Time Enter Update
|
||||||
|
// This is to avoid replacing time Enter by a new Value, right after current Thread is set to Null.
|
||||||
|
// So we set to this ghost value while we are setting
|
||||||
|
|
||||||
CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null;
|
if (logger.isTraceEnabled()) {
|
||||||
if (analyzer != null) {
|
|
||||||
long nanoTimeout = analyzer.getTimeoutNanoSeconds();
|
CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null;
|
||||||
if (checkExpiration(nanoTimeout, false)) {
|
if (analyzer != null) {
|
||||||
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left"));
|
long nanoTimeout = analyzer.getTimeoutNanoSeconds();
|
||||||
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter);
|
if (checkExpiration(nanoTimeout, false)) {
|
||||||
|
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left"));
|
||||||
|
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
traceEnter = null;
|
||||||
}
|
}
|
||||||
traceEnter = null;
|
TIME_ENTER_UPDATER.set(this, 0);
|
||||||
}
|
|
||||||
|
|
||||||
TIME_LEFT_UPDATER.lazySet(this, System.nanoTime());
|
// I am pretty sure this is single threaded by now.. I don't need compareAndSet here
|
||||||
|
CURRENT_THREAD_UDPATER.set(this, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getComponentName() {
|
protected String getComponentName() {
|
||||||
|
@ -84,12 +104,10 @@ public class CriticalMeasure {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean checkExpiration(long timeout, boolean reset) {
|
public boolean checkExpiration(long timeout, boolean reset) {
|
||||||
long time = System.nanoTime();
|
|
||||||
final long timeLeft = TIME_LEFT_UPDATER.get(this);
|
|
||||||
final long timeEnter = TIME_ENTER_UPDATER.get(this);
|
final long timeEnter = TIME_ENTER_UPDATER.get(this);
|
||||||
//due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing
|
if (timeEnter != 0L) {
|
||||||
if (timeLeft - timeEnter < 0) {
|
long time = System.nanoTime();
|
||||||
boolean expired = System.nanoTime() - timeEnter > timeout;
|
boolean expired = time - timeEnter > timeout;
|
||||||
|
|
||||||
if (expired) {
|
if (expired) {
|
||||||
Exception lastTraceEnter = this.traceEnter;
|
Exception lastTraceEnter = this.traceEnter;
|
||||||
|
@ -100,10 +118,6 @@ public class CriticalMeasure {
|
||||||
logger.warn("Component " + getComponentName() + " is expired on path " + id);
|
logger.warn("Component " + getComponentName() + " is expired on path " + id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reset) {
|
|
||||||
TIME_LEFT_UPDATER.lazySet(this, time);
|
|
||||||
TIME_ENTER_UPDATER.lazySet(this, time);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return expired;
|
return expired;
|
||||||
}
|
}
|
||||||
|
@ -113,8 +127,4 @@ public class CriticalMeasure {
|
||||||
public long enterTime() {
|
public long enterTime() {
|
||||||
return TIME_ENTER_UPDATER.get(this);
|
return TIME_ENTER_UPDATER.get(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long leaveTime() {
|
|
||||||
return TIME_LEFT_UPDATER.get(this);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -156,6 +156,13 @@ public class Wait {
|
||||||
public static boolean waitFor(final Condition condition,
|
public static boolean waitFor(final Condition condition,
|
||||||
final long durationMillis,
|
final long durationMillis,
|
||||||
final long sleepMillis) {
|
final long sleepMillis) {
|
||||||
|
return waitFor(condition, durationMillis, sleepMillis, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean waitFor(final Condition condition,
|
||||||
|
final long durationMillis,
|
||||||
|
final long sleepMillis,
|
||||||
|
final boolean printThreadDump) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final long expiry = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationMillis);
|
final long expiry = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationMillis);
|
||||||
|
@ -168,7 +175,7 @@ public class Wait {
|
||||||
}
|
}
|
||||||
conditionSatisified = condition.isSatisfied();
|
conditionSatisified = condition.isSatisfied();
|
||||||
}
|
}
|
||||||
if (!conditionSatisified) {
|
if (!conditionSatisified && printThreadDump) {
|
||||||
System.out.println(ThreadDumpUtil.threadDump("thread dump"));
|
System.out.println(ThreadDumpUtil.threadDump("thread dump"));
|
||||||
}
|
}
|
||||||
return conditionSatisified;
|
return conditionSatisified;
|
||||||
|
|
|
@ -28,8 +28,7 @@ public class CriticalMeasureTest {
|
||||||
public void testCriticalMeasure() throws Exception {
|
public void testCriticalMeasure() throws Exception {
|
||||||
CriticalMeasure measure = new CriticalMeasure(null, 1);
|
CriticalMeasure measure = new CriticalMeasure(null, 1);
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.SECONDS.toNanos(5));
|
||||||
CriticalMeasure.TIME_LEFT_UPDATER.set(measure, time);
|
|
||||||
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +38,8 @@ public class CriticalMeasureTest {
|
||||||
CriticalComponent component = new CriticalComponentImpl(analyzer, 5);
|
CriticalComponent component = new CriticalComponentImpl(analyzer, 5);
|
||||||
CriticalMeasure measure = new CriticalMeasure(component, 1);
|
CriticalMeasure measure = new CriticalMeasure(component, 1);
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
CriticalMeasure.CURRENT_THREAD_UDPATER.set(measure, Thread.currentThread());
|
||||||
|
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(30));
|
||||||
measure.leaveCritical();
|
measure.leaveCritical();
|
||||||
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,6 @@ public class CriticalMeasureTest {
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
measure.enterCritical();
|
measure.enterCritical();
|
||||||
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
||||||
CriticalMeasure.TIME_LEFT_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(10));
|
|
||||||
Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
||||||
measure.leaveCritical();
|
measure.leaveCritical();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.utils.critical;
|
||||||
|
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
import org.apache.activemq.artemis.utils.Wait;
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class MultiThreadCriticalMeasureTest {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(MultiThreadCriticalMeasureTest.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiThread() throws Throwable {
|
||||||
|
int THREADS = 100;
|
||||||
|
AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
Thread[] threads = new Thread[THREADS];
|
||||||
|
AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
ReusableLatch latch = new ReusableLatch(0);
|
||||||
|
ReusableLatch latchOnMeasure = new ReusableLatch(0);
|
||||||
|
try {
|
||||||
|
CriticalMeasure measure = new CriticalMeasure((t, r) -> false, 0);
|
||||||
|
|
||||||
|
CyclicBarrier barrier = new CyclicBarrier(THREADS + 1);
|
||||||
|
|
||||||
|
Runnable runnable = () -> {
|
||||||
|
try {
|
||||||
|
logger.debug("Thread " + Thread.currentThread().getName() + " waiting to Star");
|
||||||
|
barrier.await();
|
||||||
|
logger.debug("Thread " + Thread.currentThread().getName() + " Started");
|
||||||
|
while (running.get()) {
|
||||||
|
if (!latch.await(1, TimeUnit.NANOSECONDS)) {
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
measure.enterCritical();
|
||||||
|
latchOnMeasure.await();
|
||||||
|
measure.leaveCritical();
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
errors.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for (int i = 0; i < THREADS; i++) {
|
||||||
|
threads[i] = new Thread(runnable, "t=" + i);
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug("Going to release it now");
|
||||||
|
barrier.await();
|
||||||
|
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
|
||||||
|
logger.debug("Count up " + i);
|
||||||
|
|
||||||
|
// simulating load down on the system... this will freeze load
|
||||||
|
latch.countUp();
|
||||||
|
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(20));
|
||||||
|
Assert.assertFalse(measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(10), false));
|
||||||
|
logger.debug("Count down");
|
||||||
|
|
||||||
|
// this will resume load
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
latchOnMeasure.countUp();
|
||||||
|
|
||||||
|
Assert.assertTrue(Wait.waitFor(() -> measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(100), false), 1_000, 1));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
latchOnMeasure.countDown();
|
||||||
|
running.set(false);
|
||||||
|
|
||||||
|
Assert.assertEquals(0, errors.get());
|
||||||
|
|
||||||
|
for (Thread t : threads) {
|
||||||
|
if (t != null) {
|
||||||
|
t.join(100);
|
||||||
|
if (t.isAlive()) {
|
||||||
|
t.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue