From 39f468450b0fba84cdcda4556afd047fa4156cc1 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 27 Apr 2021 14:15:03 -0400 Subject: [PATCH] ARTEMIS-3271 Improve Critical Analyzer to sample single therads worked in collaboration with Gary Tully on this fix --- .../utils/critical/CriticalMeasure.java | 72 ++++++----- .../apache/activemq/artemis/utils/Wait.java | 9 +- .../utils/critical/CriticalMeasureTest.java | 7 +- .../MultiThreadCriticalMeasureTest.java | 114 ++++++++++++++++++ 4 files changed, 166 insertions(+), 36 deletions(-) create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java index a32fe8c15f..ecdb8da37a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils.critical; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.jboss.logging.Logger; @@ -30,10 +31,15 @@ public class CriticalMeasure { //uses updaters to avoid creates many AtomicLong instances static final AtomicLongFieldUpdater TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter"); - static final AtomicLongFieldUpdater TIME_LEFT_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeLeft"); + static final AtomicReferenceFieldUpdater CURRENT_THREAD_UDPATER = AtomicReferenceFieldUpdater.newUpdater(CriticalMeasure.class, Thread.class, "currentThread"); + + // While reseting the leaveMethod, I want to make sure no enter call would reset the value. + // so I set the Current Thread to this Ghost Thread, to then set it back to null + private static final Thread GHOST_THREAD = new Thread(); + + private volatile Thread currentThread; private volatile long timeEnter; - private volatile long timeLeft; private final int id; private final CriticalComponent component; @@ -44,35 +50,49 @@ public class CriticalMeasure { //prefer this approach instead of using some fixed value because System::nanoTime could change sign //with long running processes long time = System.nanoTime(); - TIME_LEFT_UPDATER.set(this, time); - TIME_ENTER_UPDATER.set(this, time); + TIME_ENTER_UPDATER.set(this, 0); } public void enterCritical() { - //prefer lazySet in order to avoid heavy-weight full barriers on x86 - TIME_ENTER_UPDATER.lazySet(this, System.nanoTime()); - if (logger.isTraceEnabled()) { - traceEnter = new Exception("entered"); + // We should only measure a single thread from all of the threads. + // We are fine to ignore any other calls made to this component, we only need one thread measured + if (CURRENT_THREAD_UDPATER.compareAndSet(this, null, Thread.currentThread())) { + //prefer lazySet in order to avoid heavy-weight full barriers on x86 + TIME_ENTER_UPDATER.lazySet(this, System.nanoTime()); + + if (logger.isTraceEnabled()) { + traceEnter = new Exception("entered"); + } } } public void leaveCritical() { - if (logger.isTraceEnabled()) { + // We should only measure a single thread from all of the threads. + // We are fine to ignore any other calls made to this component, we only need one thread measured + if (CURRENT_THREAD_UDPATER.compareAndSet(this, Thread.currentThread(), GHOST_THREAD)) { + // NULL_THREAD here represents a state where I would be ignoring any call to enterCritical or levalCritical, while I reset the Time Enter Update + // This is to avoid replacing time Enter by a new Value, right after current Thread is set to Null. + // So we set to this ghost value while we are setting - CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null; - if (analyzer != null) { - long nanoTimeout = analyzer.getTimeoutNanoSeconds(); - if (checkExpiration(nanoTimeout, false)) { - logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left")); - logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter); + if (logger.isTraceEnabled()) { + + CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null; + if (analyzer != null) { + long nanoTimeout = analyzer.getTimeoutNanoSeconds(); + if (checkExpiration(nanoTimeout, false)) { + logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("left")); + logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter); + } } + traceEnter = null; } - traceEnter = null; - } + TIME_ENTER_UPDATER.set(this, 0); - TIME_LEFT_UPDATER.lazySet(this, System.nanoTime()); + // I am pretty sure this is single threaded by now.. I don't need compareAndSet here + CURRENT_THREAD_UDPATER.set(this, null); + } } protected String getComponentName() { @@ -84,12 +104,10 @@ public class CriticalMeasure { } public boolean checkExpiration(long timeout, boolean reset) { - long time = System.nanoTime(); - final long timeLeft = TIME_LEFT_UPDATER.get(this); final long timeEnter = TIME_ENTER_UPDATER.get(this); - //due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing - if (timeLeft - timeEnter < 0) { - boolean expired = System.nanoTime() - timeEnter > timeout; + if (timeEnter != 0L) { + long time = System.nanoTime(); + boolean expired = time - timeEnter > timeout; if (expired) { Exception lastTraceEnter = this.traceEnter; @@ -100,10 +118,6 @@ public class CriticalMeasure { logger.warn("Component " + getComponentName() + " is expired on path " + id); } - if (reset) { - TIME_LEFT_UPDATER.lazySet(this, time); - TIME_ENTER_UPDATER.lazySet(this, time); - } } return expired; } @@ -113,8 +127,4 @@ public class CriticalMeasure { public long enterTime() { return TIME_ENTER_UPDATER.get(this); } - - public long leaveTime() { - return TIME_LEFT_UPDATER.get(this); - } } \ No newline at end of file diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java index a6da336d03..8e8d61da62 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java @@ -156,6 +156,13 @@ public class Wait { public static boolean waitFor(final Condition condition, final long durationMillis, final long sleepMillis) { + return waitFor(condition, durationMillis, sleepMillis, true); + } + + public static boolean waitFor(final Condition condition, + final long durationMillis, + final long sleepMillis, + final boolean printThreadDump) { try { final long expiry = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationMillis); @@ -168,7 +175,7 @@ public class Wait { } conditionSatisified = condition.isSatisfied(); } - if (!conditionSatisified) { + if (!conditionSatisified && printThreadDump) { System.out.println(ThreadDumpUtil.threadDump("thread dump")); } return conditionSatisified; diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java index 330892e757..0e4d7ec630 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalMeasureTest.java @@ -28,8 +28,7 @@ public class CriticalMeasureTest { public void testCriticalMeasure() throws Exception { CriticalMeasure measure = new CriticalMeasure(null, 1); long time = System.nanoTime(); - CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5)); - CriticalMeasure.TIME_LEFT_UPDATER.set(measure, time); + CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.SECONDS.toNanos(5)); Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); } @@ -39,7 +38,8 @@ public class CriticalMeasureTest { CriticalComponent component = new CriticalComponentImpl(analyzer, 5); CriticalMeasure measure = new CriticalMeasure(component, 1); long time = System.nanoTime(); - CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5)); + CriticalMeasure.CURRENT_THREAD_UDPATER.set(measure, Thread.currentThread()); + CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(30)); measure.leaveCritical(); Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); } @@ -52,7 +52,6 @@ public class CriticalMeasureTest { long time = System.nanoTime(); measure.enterCritical(); CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5)); - CriticalMeasure.TIME_LEFT_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(10)); Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false)); measure.leaveCritical(); } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java new file mode 100644 index 0000000000..5e1370cb97 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/MultiThreadCriticalMeasureTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.critical; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +import org.apache.activemq.artemis.utils.ReusableLatch; +import org.apache.activemq.artemis.utils.Wait; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Test; + +public class MultiThreadCriticalMeasureTest { + + private static final Logger logger = Logger.getLogger(MultiThreadCriticalMeasureTest.class); + + @Test + public void testMultiThread() throws Throwable { + int THREADS = 100; + AtomicInteger errors = new AtomicInteger(0); + Thread[] threads = new Thread[THREADS]; + AtomicBoolean running = new AtomicBoolean(true); + ReusableLatch latch = new ReusableLatch(0); + ReusableLatch latchOnMeasure = new ReusableLatch(0); + try { + CriticalMeasure measure = new CriticalMeasure((t, r) -> false, 0); + + CyclicBarrier barrier = new CyclicBarrier(THREADS + 1); + + Runnable runnable = () -> { + try { + logger.debug("Thread " + Thread.currentThread().getName() + " waiting to Star"); + barrier.await(); + logger.debug("Thread " + Thread.currentThread().getName() + " Started"); + while (running.get()) { + if (!latch.await(1, TimeUnit.NANOSECONDS)) { + latch.await(); + } + + measure.enterCritical(); + latchOnMeasure.await(); + measure.leaveCritical(); + } + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }; + + for (int i = 0; i < THREADS; i++) { + threads[i] = new Thread(runnable, "t=" + i); + threads[i].start(); + } + + logger.debug("Going to release it now"); + barrier.await(); + + for (int i = 0; i < 50; i++) { + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10)); + logger.debug("Count up " + i); + + // simulating load down on the system... this will freeze load + latch.countUp(); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(20)); + Assert.assertFalse(measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(10), false)); + logger.debug("Count down"); + + // this will resume load + latch.countDown(); + } + + latchOnMeasure.countUp(); + + Assert.assertTrue(Wait.waitFor(() -> measure.checkExpiration(TimeUnit.MILLISECONDS.toNanos(100), false), 1_000, 1)); + + } finally { + latch.countDown(); + latchOnMeasure.countDown(); + running.set(false); + + Assert.assertEquals(0, errors.get()); + + for (Thread t : threads) { + if (t != null) { + t.join(100); + if (t.isAlive()) { + t.interrupt(); + } + } + } + + } + + } +}