This closes #3061
This commit is contained in:
commit
a30fc81a61
artemis-commons/src
main/java/org/apache/activemq/artemis/utils/critical
test/java/org/apache/activemq/artemis/utils/critical
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical
|
@ -136,8 +136,8 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
try {
|
||||
for (CriticalComponent component : components) {
|
||||
|
||||
if (component.isExpired(timeoutNanoSeconds)) {
|
||||
fireAction(component);
|
||||
if (component.checkExpiration(timeoutNanoSeconds, true)) {
|
||||
fireActions(component);
|
||||
// no need to keep running if there's already a component failed
|
||||
return;
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
}
|
||||
}
|
||||
|
||||
private void fireAction(CriticalComponent component) {
|
||||
private void fireActions(CriticalComponent component) {
|
||||
for (CriticalAction action : actions) {
|
||||
try {
|
||||
action.run(component);
|
||||
|
@ -157,14 +157,11 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
|
|||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
actions.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
scheduledComponent.start();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -49,9 +49,10 @@ public interface CriticalComponent {
|
|||
}
|
||||
|
||||
/**
|
||||
* Is this Component expired at a given timeout.. on any of its paths.
|
||||
* @param timeout
|
||||
* Check if the component is expired at a given timeout.. on any of its paths.
|
||||
* @param timeout - the timeout to check if the component is expired
|
||||
* @param reset - true to reset the component timer if it is expired
|
||||
* @return -1 if it's ok, or the number of the path it failed
|
||||
*/
|
||||
boolean isExpired(long timeout);
|
||||
boolean checkExpiration(long timeout, boolean reset);
|
||||
}
|
|
@ -62,9 +62,9 @@ public class CriticalComponentImpl implements CriticalComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean isExpired(long timeout) {
|
||||
public boolean checkExpiration(long timeout, boolean reset) {
|
||||
for (int i = 0; i < measures.length; i++) {
|
||||
if (measures[i].isExpired(timeout)) {
|
||||
if (measures[i].checkExpiration(timeout, reset)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ public class CriticalMeasure {
|
|||
CriticalAnalyzer analyzer = component != null ? component.getCriticalAnalyzer() : null;
|
||||
if (analyzer != null) {
|
||||
long nanoTimeout = analyzer.getTimeoutNanoSeconds();
|
||||
if (isExpired(nanoTimeout)) {
|
||||
if (checkExpiration(nanoTimeout, false)) {
|
||||
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, leaving at", new Exception("entered"));
|
||||
logger.trace("Path " + id + " on component " + getComponentName() + " is taking too long, entered at", traceEnter);
|
||||
}
|
||||
|
@ -83,7 +83,8 @@ public class CriticalMeasure {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isExpired(long timeout) {
|
||||
public boolean checkExpiration(long timeout, boolean reset) {
|
||||
long time = System.nanoTime();
|
||||
final long timeLeft = TIME_LEFT_UPDATER.get(this);
|
||||
final long timeEnter = TIME_ENTER_UPDATER.get(this);
|
||||
//due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing
|
||||
|
@ -91,13 +92,18 @@ public class CriticalMeasure {
|
|||
boolean expired = System.nanoTime() - timeEnter > timeout;
|
||||
|
||||
if (expired) {
|
||||
Exception lastTraceEnter = this.traceEnter;
|
||||
|
||||
Exception thistraceEnter = this.traceEnter;
|
||||
if (thistraceEnter != null) {
|
||||
logger.warn("Component " + getComponentName() + " is expired on path " + id, thistraceEnter);
|
||||
if (lastTraceEnter != null) {
|
||||
logger.warn("Component " + getComponentName() + " is expired on path " + id, lastTraceEnter);
|
||||
} else {
|
||||
logger.warn("Component " + getComponentName() + " is expired on path " + id);
|
||||
}
|
||||
|
||||
if (reset) {
|
||||
TIME_LEFT_UPDATER.lazySet(this, time);
|
||||
TIME_ENTER_UPDATER.lazySet(this, time);
|
||||
}
|
||||
}
|
||||
return expired;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.activemq.artemis.utils.critical;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -45,7 +46,7 @@ public class CriticalAnalyzerTest {
|
|||
analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
|
||||
analyzer.add(new CriticalComponent() {
|
||||
@Override
|
||||
public boolean isExpired(long timeout) {
|
||||
public boolean checkExpiration(long timeout, boolean reset) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
@ -66,7 +67,15 @@ public class CriticalAnalyzerTest {
|
|||
|
||||
@Test
|
||||
public void testActionOnImpl() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
|
||||
|
||||
analyzer.addAction((CriticalComponent comp) -> {
|
||||
System.out.println("component " + comp + " received");
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
|
||||
analyzer.add(component);
|
||||
|
||||
|
@ -74,15 +83,9 @@ public class CriticalAnalyzerTest {
|
|||
component.leaveCritical(0);
|
||||
component.enterCritical(1);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
analyzer.start();
|
||||
|
||||
analyzer.addAction((CriticalComponent comp) -> {
|
||||
System.out.println("component " + comp + " received");
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
|
||||
|
||||
analyzer.stop();
|
||||
|
@ -93,7 +96,7 @@ public class CriticalAnalyzerTest {
|
|||
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
|
||||
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
|
||||
component.enterCritical(0);
|
||||
Assert.assertFalse(component.isExpired(TimeUnit.MINUTES.toNanos(1)));
|
||||
Assert.assertFalse(component.checkExpiration(TimeUnit.MINUTES.toNanos(1), false));
|
||||
analyzer.stop();
|
||||
|
||||
}
|
||||
|
@ -104,32 +107,62 @@ public class CriticalAnalyzerTest {
|
|||
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
|
||||
component.enterCritical(0);
|
||||
Thread.sleep(50);
|
||||
Assert.assertTrue(component.isExpired(0));
|
||||
Assert.assertTrue(component.checkExpiration(0, false));
|
||||
analyzer.stop();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNegative() throws Exception {
|
||||
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
|
||||
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
|
||||
analyzer.add(component);
|
||||
|
||||
component.enterCritical(0);
|
||||
component.leaveCritical(0);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
analyzer.start();
|
||||
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
|
||||
|
||||
analyzer.addAction((CriticalComponent comp) -> {
|
||||
System.out.println("component " + comp + " received");
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
|
||||
analyzer.add(component);
|
||||
|
||||
component.enterCritical(0);
|
||||
component.leaveCritical(0);
|
||||
|
||||
analyzer.start();
|
||||
|
||||
Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
|
||||
|
||||
analyzer.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPositive() throws Exception {
|
||||
ReusableLatch latch = new ReusableLatch(1);
|
||||
|
||||
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
|
||||
|
||||
analyzer.addAction((CriticalComponent comp) -> {
|
||||
System.out.println("component " + comp + " received");
|
||||
latch.countDown();
|
||||
});
|
||||
|
||||
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
|
||||
analyzer.add(component);
|
||||
|
||||
component.enterCritical(0);
|
||||
Thread.sleep(50);
|
||||
|
||||
analyzer.start();
|
||||
|
||||
Assert.assertTrue(latch.await(100, TimeUnit.MILLISECONDS));
|
||||
|
||||
component.leaveCritical(0);
|
||||
|
||||
latch.setCount(1);
|
||||
|
||||
Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
|
||||
|
||||
analyzer.stop();
|
||||
}
|
||||
}
|
|
@ -30,7 +30,7 @@ public class CriticalMeasureTest {
|
|||
long time = System.nanoTime();
|
||||
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
||||
CriticalMeasure.TIME_LEFT_UPDATER.set(measure, time);
|
||||
Assert.assertFalse(measure.isExpired(TimeUnit.SECONDS.toNanos(30)));
|
||||
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -41,7 +41,7 @@ public class CriticalMeasureTest {
|
|||
long time = System.nanoTime();
|
||||
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
||||
measure.leaveCritical();
|
||||
Assert.assertFalse(measure.isExpired(TimeUnit.SECONDS.toNanos(30)));
|
||||
Assert.assertFalse(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -53,7 +53,7 @@ public class CriticalMeasureTest {
|
|||
measure.enterCritical();
|
||||
CriticalMeasure.TIME_ENTER_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(5));
|
||||
CriticalMeasure.TIME_LEFT_UPDATER.set(measure, time - TimeUnit.MINUTES.toNanos(10));
|
||||
Assert.assertTrue(measure.isExpired(TimeUnit.SECONDS.toNanos(30)));
|
||||
Assert.assertTrue(measure.checkExpiration(TimeUnit.SECONDS.toNanos(30), false));
|
||||
measure.leaveCritical();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1265,6 +1265,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
}
|
||||
|
||||
try {
|
||||
this.analyzer.clear();
|
||||
this.analyzer.stop();
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
|
|
|
@ -54,7 +54,7 @@ public class CriticalSimpleTest extends ActiveMQTestBase {
|
|||
|
||||
server.getCriticalAnalyzer().add(new CriticalComponent() {
|
||||
@Override
|
||||
public boolean isExpired(long timeout) {
|
||||
public boolean checkExpiration(long timeout, boolean reset) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
@ -83,7 +83,7 @@ public class CriticalSimpleTest extends ActiveMQTestBase {
|
|||
try {
|
||||
server.getCriticalAnalyzer().add(new CriticalComponent() {
|
||||
@Override
|
||||
public boolean isExpired(long timeout) {
|
||||
public boolean checkExpiration(long timeout, boolean reset) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue