https://issues.apache.org/jira/browse/AMQ-6084 - add broker.adjustUsageLimits to disable the adjustment of limits to what is available. In this way, a broker will fail to start if constrained, ensuring it won't accept connections and block pending resources if it has earlier exited for that reason

This commit is contained in:
gtully 2015-12-10 11:23:56 +00:00
parent 5db5f3e39a
commit d7febddb67
3 changed files with 127 additions and 53 deletions

View File

@ -243,6 +243,7 @@ public class BrokerService implements Service {
private int maxPurgedDestinationsPerSweep = 0;
private int schedulePeriodForDiskUsageCheck = 0;
private int diskUsageCheckRegrowThreshold = -1;
private boolean adjustUsageLimits = true;
private BrokerContext brokerContext;
private boolean networkConnectorStartAsync = false;
private boolean allowTempAutoCreationOnSend;
@ -591,6 +592,7 @@ public class BrokerService implements Service {
MDC.put("activemq.broker", brokerName);
try {
checkMemorySystemUsageLimits();
if (systemExitOnShutdown && useShutdownHook) {
throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
}
@ -740,7 +742,7 @@ public class BrokerService implements Service {
LOG.info("For help or more information please see: http://activemq.apache.org");
getBroker().brokerServiceStarted();
checkSystemUsageLimits();
checkStoreSystemUsageLimits();
startedLatch.countDown();
getBroker().nowMasterBroker();
}
@ -1973,7 +1975,7 @@ public class BrokerService implements Service {
* Check that the store usage limit is not greater than max usable
* space and adjust if it is
*/
protected void checkStoreUsageLimits() throws IOException {
protected void checkStoreUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage();
if (getPersistenceAdapter() != null) {
@ -2001,7 +2003,7 @@ public class BrokerService implements Service {
* Check that temporary usage limit is not greater than max usable
* space and adjust if it is
*/
protected void checkTmpStoreUsageLimits() throws IOException {
protected void checkTmpStoreUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage();
File tmpDir = getTmpDataDirectory();
@ -2030,7 +2032,7 @@ public class BrokerService implements Service {
}
}
protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) {
protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) throws ConfigurationException {
if (dir != null) {
dir = StoreUtil.findParentDirectory(dir);
String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store";
@ -2064,6 +2066,17 @@ public class BrokerService implements Service {
//check if the limit is too large for the amount of usable space
} else if (storeLimit > totalUsableSpace) {
final String message = storeName + " limit is " + storeLimit / oneMeg
+ " mb (current store usage is " + storeCurrent / oneMeg
+ " mb). The data directory: " + dir.getAbsolutePath()
+ " only has " + totalUsableSpace / oneMeg
+ " mb of usable space.";
if (!isAdjustUsageLimits()) {
LOG.error(message);
throw new ConfigurationException(message);
}
if (percentLimit > 0) {
LOG.warn(storeName + " limit has been set to "
+ percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
@ -2072,14 +2085,10 @@ public class BrokerService implements Service {
+ " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
+ " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
+ " is available - resetting limit");
} else {
LOG.warn(message + " - resetting to maximum available disk space: " +
totalUsableSpace / oneMeg + " mb");
}
LOG.warn(storeName + " limit is " + storeLimit / oneMeg +
" mb (current store usage is " + storeCurrent / oneMeg +
" mb). The data directory: " + dir.getAbsolutePath() +
" only has " + totalUsableSpace / oneMeg +
" mb of usable space - resetting to maximum available disk space: " +
totalUsableSpace / oneMeg + " mb");
storeUsage.setLimit(totalUsableSpace);
}
}
@ -2098,13 +2107,13 @@ public class BrokerService implements Service {
public void run() {
try {
checkStoreUsageLimits();
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to check persistent disk usage limits", e);
}
try {
checkTmpStoreUsageLimits();
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to check temporary store usage limits", e);
}
}
@ -2113,17 +2122,27 @@ public class BrokerService implements Service {
}
}
protected void checkSystemUsageLimits() throws IOException {
protected void checkMemorySystemUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage();
long memLimit = usage.getMemoryUsage().getLimit();
long jvmLimit = Runtime.getRuntime().maxMemory();
if (memLimit > jvmLimit) {
usage.getMemoryUsage().setPercentOfJvmHeap(70);
LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
" mb) is more than the maximum available for the JVM: " +
jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
+ "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
if (adjustUsageLimits) {
usage.getMemoryUsage().setPercentOfJvmHeap(70);
LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
} else {
LOG.error(message);
throw new ConfigurationException(message);
}
}
}
protected void checkStoreSystemUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage();
//Check the persistent store and temp store limits if they exist
//and schedule a periodic check to update disk limits if
@ -3168,4 +3187,12 @@ public class BrokerService implements Service {
boolean useVirtualDestSubsOnCreation) {
this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
}
public boolean isAdjustUsageLimits() {
return adjustUsageLimits;
}
public void setAdjustUsageLimits(boolean adjustUsageLimits) {
this.adjustUsageLimits = adjustUsageLimits;
}
}

View File

@ -17,9 +17,12 @@
package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.StoreUtil;
@ -120,6 +123,21 @@ public class PercentDiskUsageLimitTest {
}
}
@Test(timeout=30000)
public void testStartFailDiskLimitOverMaxFree() throws Exception {
broker.setAdjustUsageLimits(false);
int freePercent = getFreePercentage();
if (freePercent > 1) {
storeUsage.setPercentLimit(freePercent + 1);
try {
startBroker();
fail("Expect ex");
} catch (ConfigurationException expected) {}
}
}
@Test(timeout=30000)
public void testDiskLimitOver100Percent() throws Exception {
int freePercent = getFreePercentage();

View File

@ -17,59 +17,88 @@
package org.apache.activemq.usage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ConfigurationException;
import org.junit.Test;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
public class StoreUsageLimitsTest extends EmbeddedBrokerTestSupport {
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StoreUsageLimitsTest {
final int WAIT_TIME_MILLS = 20 * 1000;
private static final String limitsLogLevel = "warn";
final String toMatch = new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)));
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.getSystemUsage().getMemoryUsage().setLimit(Long.MAX_VALUE);
broker.getSystemUsage().setCheckLimitsLogLevel(limitsLogLevel);
broker.deleteAllMessages();
return broker;
}
@Override
protected boolean isPersistent() {
return true;
}
@Test
public void testCheckLimitsLogLevel() throws Exception {
File file = new File("target/activemq-test.log");
if (!file.exists()) {
fail("target/activemq-test.log was not created.");
}
BufferedReader br = null;
boolean foundUsage = false;
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(file), Charset.forName("UTF-8")));
String line = null;
while ((line = br.readLine()) != null) {
if (line.contains(new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)))) && line.contains(limitsLogLevel.toUpperCase())) {
foundUsage = true;
final CountDownLatch foundMessage = new CountDownLatch(1);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
String message = (String) event.getMessage();
if (message.contains(toMatch) && event.getLevel().equals(Level.WARN)) {
foundMessage.countDown();
}
}
} catch (Exception e) {
fail(e.getMessage());
} finally {
br.close();
}
};
if (!foundUsage)
fail("checkLimitsLogLevel message did not write to log target/activemq-test.log");
Logger.getRootLogger().addAppender(appender);
BrokerService brokerService = createBroker();
brokerService.start();
brokerService.stop();
assertTrue("Fount log message", foundMessage.await(WAIT_TIME_MILLS, TimeUnit.MILLISECONDS));
Logger.getRootLogger().removeAppender(appender);
}
@Test
public void testCheckLimitsFailStart() throws Exception {
final CountDownLatch foundMessage = new CountDownLatch(1);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
String message = (String) event.getMessage();
if (message.contains(toMatch) && event.getLevel().equals(Level.ERROR)) {
foundMessage.countDown();
}
}
};
Logger.getRootLogger().addAppender(appender);
BrokerService brokerService = createBroker();
brokerService.setAdjustUsageLimits(false);
try {
brokerService.start();
fail("expect ConfigurationException");
} catch (ConfigurationException expected) {
assertTrue("exception message match", expected.getLocalizedMessage().contains(toMatch));
}
brokerService.stop();
assertTrue("Fount log message", foundMessage.await(WAIT_TIME_MILLS, TimeUnit.MILLISECONDS));
Logger.getRootLogger().removeAppender(appender);
}
}