This commit is contained in:
Clebert Suconic 2017-09-07 10:11:35 -04:00
commit 65a0c6104a
7 changed files with 89 additions and 52 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.utils.critical;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
public interface CriticalAnalyzer extends ActiveMQComponent {
@ -29,13 +31,13 @@ public interface CriticalAnalyzer extends ActiveMQComponent {
void remove(CriticalComponent component);
CriticalAnalyzer setCheckTime(long timeout);
CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit);
long getCheckTime();
long getCheckTimeNanoSeconds();
CriticalAnalyzer setTimeout(long timeout);
CriticalAnalyzer setTimeout(long timeout, TimeUnit unit);
long getTimeout();
long getTimeout(TimeUnit unit);
CriticalAnalyzer addAction(CriticalAction action);

View File

@ -29,9 +29,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);
private volatile long timeout;
private volatile long timeoutNanoSeconds;
private volatile long checkTime;
private volatile long checkTimeNanoSeconds;
@Override
public void clear() {
@ -63,31 +63,31 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
}
@Override
public CriticalAnalyzer setCheckTime(long timeout) {
this.checkTime = timeout;
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
this.checkTimeNanoSeconds = timeout;
return this;
}
@Override
public long getCheckTime() {
if (checkTime == 0) {
checkTime = getTimeout() / 2;
public long getCheckTimeNanoSeconds() {
if (checkTimeNanoSeconds == 0) {
checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2;
}
return checkTime;
return checkTimeNanoSeconds;
}
@Override
public CriticalAnalyzer setTimeout(long timeout) {
this.timeout = timeout;
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
this.timeoutNanoSeconds = unit.toNanos(timeout);
return this;
}
@Override
public long getTimeout() {
if (timeout == 0) {
timeout = TimeUnit.MINUTES.toMillis(2);
public long getTimeout(TimeUnit unit) {
if (timeoutNanoSeconds == 0) {
timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2);
}
return timeout;
return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS);
}
@Override
@ -103,7 +103,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
try {
for (CriticalComponent component : components) {
if (component.isExpired(timeout)) {
if (component.isExpired(timeoutNanoSeconds)) {
fireAction(component);
// no need to keep running if there's already a component failed
return;
@ -142,7 +142,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
public void run() {
try {
while (true) {
if (running.tryAcquire(getCheckTime(), TimeUnit.MILLISECONDS)) {
if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) {
running.release();
// this means that the server has been stopped as we could acquire the semaphore... returning now
break;

View File

@ -20,7 +20,7 @@ package org.apache.activemq.artemis.utils.critical;
/**
* A Critical component enters and leaves a critical state.
* You update a long every time you enter a critical path
* you update a different long with a System.currentMillis every time you leave that path.
* you update a different long with a System.nanoTime every time you leave that path.
*
* If the enterCritical > leaveCritical at any point, then you need to measure the timeout.
* if the system stops responding, then you have something irresponsive at the system.

View File

@ -17,36 +17,48 @@
package org.apache.activemq.artemis.utils.critical;
import org.jboss.logging.Logger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class CriticalMeasure {
private static final Logger logger = Logger.getLogger(CriticalMeasure.class);
//uses updaters to avoid creates many AtomicLong instances
private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter");
private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_LEFT_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeLeft");
private volatile long timeEnter;
private volatile long timeLeft;
public CriticalMeasure() {
//prefer this approach instead of using some fixed value because System::nanoTime could change sign
//with long running processes
enterCritical();
leaveCritical();
}
public void enterCritical() {
timeEnter = System.currentTimeMillis();
//prefer lazySet in order to avoid heavy-weight full barriers on x86
TIME_ENTER_UPDATER.lazySet(this, System.nanoTime());
}
public void leaveCritical() {
timeLeft = System.currentTimeMillis();
TIME_LEFT_UPDATER.lazySet(this, System.nanoTime());
}
public boolean isExpired(long timeout) {
if (timeEnter > timeLeft) {
return System.currentTimeMillis() - timeEnter > timeout;
final long timeLeft = TIME_LEFT_UPDATER.get(this);
final long timeEnter = TIME_ENTER_UPDATER.get(this);
//due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing
if (timeLeft - timeEnter < 0) {
return System.nanoTime() - timeEnter > timeout;
}
return false;
}
public long enterTime() {
return timeEnter;
return TIME_ENTER_UPDATER.get(this);
}
public long leaveTime() {
return timeLeft;
return TIME_LEFT_UPDATER.get(this);
}
}

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.utils.critical;
import java.util.concurrent.TimeUnit;
public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();
@ -59,22 +61,22 @@ public class EmptyCriticalAnalyzer implements CriticalAnalyzer {
}
@Override
public CriticalAnalyzer setCheckTime(long timeout) {
return this;
public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {
return null;
}
@Override
public long getCheckTime() {
public long getCheckTimeNanoSeconds() {
return 0;
}
@Override
public CriticalAnalyzer setTimeout(long timeout) {
return this;
public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {
return null;
}
@Override
public long getTimeout() {
public long getTimeout(TimeUnit unit) {
return 0;
}

View File

@ -42,7 +42,7 @@ public class CriticalAnalyzerTest {
@Test
public void testAction() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(100).setCheckTime(50);
analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS);
analyzer.add(new CriticalComponent() {
@Override
public boolean isExpired(long timeout) {
@ -66,7 +66,7 @@ public class CriticalAnalyzerTest {
@Test
public void testActionOnImpl() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
analyzer.add(component);
@ -88,9 +88,30 @@ public class CriticalAnalyzerTest {
analyzer.stop();
}
@Test
public void testEnterNoLeaveNoExpire() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
component.enterCritical(0);
Assert.assertFalse(component.isExpired(TimeUnit.MINUTES.toNanos(1)));
analyzer.stop();
}
@Test
public void testEnterNoLeaveExpire() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 2);
component.enterCritical(0);
Thread.sleep(50);
Assert.assertTrue(component.isExpired(0));
analyzer.stop();
}
@Test
public void testNegative() throws Exception {
analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5);
analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS);
CriticalComponent component = new CriticalComponentImpl(analyzer, 1);
analyzer.add(component);
@ -111,4 +132,4 @@ public class CriticalAnalyzerTest {
analyzer.stop();
}
}
}

View File

@ -506,7 +506,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/
this.analyzer.clear();
this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod()).setTimeout(configuration.getCriticalAnalyzerTimeout());
this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);
if (configuration.isCriticalAnalyzer()) {
this.getCriticalAnalyzer().start();
@ -1437,7 +1437,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null);
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes);
@ -1838,7 +1838,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
removeConsumers, autoDeleteAddress) : null);
addressSettingsRepository.clearCache();
@ -1882,7 +1882,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callPostQueueDeletionCallbacks(address, queueName);
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
removeConsumers, autoDeleteAddress) : null);
}
@Override
@ -2456,13 +2456,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception {
Set<String> addressesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());
.map(CoreAddressConfiguration::getName)
.collect(Collectors.toSet());
Set<String> queuesInConfig = configuration.getAddressConfigurations().stream()
.map(CoreAddressConfiguration::getQueueConfigurations)
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
.collect(Collectors.toSet());
.map(CoreAddressConfiguration::getQueueConfigurations)
.flatMap(List::stream).map(CoreQueueConfiguration::getName)
.collect(Collectors.toSet());
for (SimpleString addressName : listAddressNames()) {
AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
@ -2521,8 +2521,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers());
if (queue == null) {
queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
queueName, SimpleString.toSimpleString(config.getFilterString()), null,
config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
queueName, SimpleString.toSimpleString(config.getFilterString()), null,
config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true);
}
return queue;
}
@ -2990,4 +2990,4 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
}
}