https://issues.apache.org/jira/browse/AMQ-6084 - add broker.adjustUsageLimits to disable the adjustment of limits to what is available. In this way, a broker will fail to start if constrained, ensuring it won't accept connections and block pending resources if it has earlier exited for that reason

(cherry picked from commit d7febddb67)
This commit is contained in:
gtully 2015-12-10 11:23:56 +00:00 committed by Timothy Bish
parent 99fce5bae9
commit 2be754583c
3 changed files with 127 additions and 53 deletions

View File

@ -243,6 +243,7 @@ public class BrokerService implements Service {
private int maxPurgedDestinationsPerSweep = 0; private int maxPurgedDestinationsPerSweep = 0;
private int schedulePeriodForDiskUsageCheck = 0; private int schedulePeriodForDiskUsageCheck = 0;
private int diskUsageCheckRegrowThreshold = -1; private int diskUsageCheckRegrowThreshold = -1;
private boolean adjustUsageLimits = true;
private BrokerContext brokerContext; private BrokerContext brokerContext;
private boolean networkConnectorStartAsync = false; private boolean networkConnectorStartAsync = false;
private boolean allowTempAutoCreationOnSend; private boolean allowTempAutoCreationOnSend;
@ -591,6 +592,7 @@ public class BrokerService implements Service {
MDC.put("activemq.broker", brokerName); MDC.put("activemq.broker", brokerName);
try { try {
checkMemorySystemUsageLimits();
if (systemExitOnShutdown && useShutdownHook) { if (systemExitOnShutdown && useShutdownHook) {
throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
} }
@ -740,7 +742,7 @@ public class BrokerService implements Service {
LOG.info("For help or more information please see: http://activemq.apache.org"); LOG.info("For help or more information please see: http://activemq.apache.org");
getBroker().brokerServiceStarted(); getBroker().brokerServiceStarted();
checkSystemUsageLimits(); checkStoreSystemUsageLimits();
startedLatch.countDown(); startedLatch.countDown();
getBroker().nowMasterBroker(); getBroker().nowMasterBroker();
} }
@ -1973,7 +1975,7 @@ public class BrokerService implements Service {
* Check that the store usage limit is not greater than max usable * Check that the store usage limit is not greater than max usable
* space and adjust if it is * space and adjust if it is
*/ */
protected void checkStoreUsageLimits() throws IOException { protected void checkStoreUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage(); final SystemUsage usage = getSystemUsage();
if (getPersistenceAdapter() != null) { if (getPersistenceAdapter() != null) {
@ -2001,7 +2003,7 @@ public class BrokerService implements Service {
* Check that temporary usage limit is not greater than max usable * Check that temporary usage limit is not greater than max usable
* space and adjust if it is * space and adjust if it is
*/ */
protected void checkTmpStoreUsageLimits() throws IOException { protected void checkTmpStoreUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage(); final SystemUsage usage = getSystemUsage();
File tmpDir = getTmpDataDirectory(); File tmpDir = getTmpDataDirectory();
@ -2030,7 +2032,7 @@ public class BrokerService implements Service {
} }
} }
protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) { protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) throws ConfigurationException {
if (dir != null) { if (dir != null) {
dir = StoreUtil.findParentDirectory(dir); dir = StoreUtil.findParentDirectory(dir);
String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store";
@ -2064,6 +2066,17 @@ public class BrokerService implements Service {
//check if the limit is too large for the amount of usable space //check if the limit is too large for the amount of usable space
} else if (storeLimit > totalUsableSpace) { } else if (storeLimit > totalUsableSpace) {
final String message = storeName + " limit is " + storeLimit / oneMeg
+ " mb (current store usage is " + storeCurrent / oneMeg
+ " mb). The data directory: " + dir.getAbsolutePath()
+ " only has " + totalUsableSpace / oneMeg
+ " mb of usable space.";
if (!isAdjustUsageLimits()) {
LOG.error(message);
throw new ConfigurationException(message);
}
if (percentLimit > 0) { if (percentLimit > 0) {
LOG.warn(storeName + " limit has been set to " LOG.warn(storeName + " limit has been set to "
+ percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)" + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
@ -2072,14 +2085,10 @@ public class BrokerService implements Service {
+ " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)" + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
+ " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
+ " is available - resetting limit"); + " is available - resetting limit");
} else {
LOG.warn(message + " - resetting to maximum available disk space: " +
totalUsableSpace / oneMeg + " mb");
} }
LOG.warn(storeName + " limit is " + storeLimit / oneMeg +
" mb (current store usage is " + storeCurrent / oneMeg +
" mb). The data directory: " + dir.getAbsolutePath() +
" only has " + totalUsableSpace / oneMeg +
" mb of usable space - resetting to maximum available disk space: " +
totalUsableSpace / oneMeg + " mb");
storeUsage.setLimit(totalUsableSpace); storeUsage.setLimit(totalUsableSpace);
} }
} }
@ -2098,13 +2107,13 @@ public class BrokerService implements Service {
public void run() { public void run() {
try { try {
checkStoreUsageLimits(); checkStoreUsageLimits();
} catch (IOException e) { } catch (Exception e) {
LOG.error("Failed to check persistent disk usage limits", e); LOG.error("Failed to check persistent disk usage limits", e);
} }
try { try {
checkTmpStoreUsageLimits(); checkTmpStoreUsageLimits();
} catch (IOException e) { } catch (Exception e) {
LOG.error("Failed to check temporary store usage limits", e); LOG.error("Failed to check temporary store usage limits", e);
} }
} }
@ -2113,17 +2122,27 @@ public class BrokerService implements Service {
} }
} }
protected void checkSystemUsageLimits() throws IOException { protected void checkMemorySystemUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage(); final SystemUsage usage = getSystemUsage();
long memLimit = usage.getMemoryUsage().getLimit(); long memLimit = usage.getMemoryUsage().getLimit();
long jvmLimit = Runtime.getRuntime().maxMemory(); long jvmLimit = Runtime.getRuntime().maxMemory();
if (memLimit > jvmLimit) { if (memLimit > jvmLimit) {
usage.getMemoryUsage().setPercentOfJvmHeap(70); final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) + + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
" mb) is more than the maximum available for the JVM: " +
jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); if (adjustUsageLimits) {
usage.getMemoryUsage().setPercentOfJvmHeap(70);
LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
} else {
LOG.error(message);
throw new ConfigurationException(message);
}
} }
}
protected void checkStoreSystemUsageLimits() throws Exception {
final SystemUsage usage = getSystemUsage();
//Check the persistent store and temp store limits if they exist //Check the persistent store and temp store limits if they exist
//and schedule a periodic check to update disk limits if //and schedule a periodic check to update disk limits if
@ -3168,4 +3187,12 @@ public class BrokerService implements Service {
boolean useVirtualDestSubsOnCreation) { boolean useVirtualDestSubsOnCreation) {
this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
} }
public boolean isAdjustUsageLimits() {
return adjustUsageLimits;
}
public void setAdjustUsageLimits(boolean adjustUsageLimits) {
this.adjustUsageLimits = adjustUsageLimits;
}
} }

View File

@ -17,9 +17,12 @@
package org.apache.activemq.usage; package org.apache.activemq.usage;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.StoreUtil; import org.apache.activemq.util.StoreUtil;
@ -120,6 +123,21 @@ public class PercentDiskUsageLimitTest {
} }
} }
@Test(timeout=30000)
public void testStartFailDiskLimitOverMaxFree() throws Exception {
broker.setAdjustUsageLimits(false);
int freePercent = getFreePercentage();
if (freePercent > 1) {
storeUsage.setPercentLimit(freePercent + 1);
try {
startBroker();
fail("Expect ex");
} catch (ConfigurationException expected) {}
}
}
@Test(timeout=30000) @Test(timeout=30000)
public void testDiskLimitOver100Percent() throws Exception { public void testDiskLimitOver100Percent() throws Exception {
int freePercent = getFreePercentage(); int freePercent = getFreePercentage();

View File

@ -17,59 +17,88 @@
package org.apache.activemq.usage; package org.apache.activemq.usage;
import java.io.BufferedReader; import java.util.concurrent.CountDownLatch;
import java.io.File; import java.util.concurrent.TimeUnit;
import java.io.FileInputStream;
import java.io.InputStreamReader; import org.apache.activemq.ConfigurationException;
import java.nio.charset.Charset; import org.junit.Test;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
public class StoreUsageLimitsTest extends EmbeddedBrokerTestSupport {
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StoreUsageLimitsTest {
final int WAIT_TIME_MILLS = 20 * 1000; final int WAIT_TIME_MILLS = 20 * 1000;
private static final String limitsLogLevel = "warn"; private static final String limitsLogLevel = "warn";
final String toMatch = new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)));
@Override
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker(); BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.getSystemUsage().getMemoryUsage().setLimit(Long.MAX_VALUE); broker.getSystemUsage().getMemoryUsage().setLimit(Long.MAX_VALUE);
broker.getSystemUsage().setCheckLimitsLogLevel(limitsLogLevel); broker.getSystemUsage().setCheckLimitsLogLevel(limitsLogLevel);
broker.deleteAllMessages(); broker.deleteAllMessages();
return broker; return broker;
} }
@Override @Test
protected boolean isPersistent() {
return true;
}
public void testCheckLimitsLogLevel() throws Exception { public void testCheckLimitsLogLevel() throws Exception {
File file = new File("target/activemq-test.log"); final CountDownLatch foundMessage = new CountDownLatch(1);
if (!file.exists()) { DefaultTestAppender appender = new DefaultTestAppender() {
fail("target/activemq-test.log was not created."); @Override
} public void doAppend(LoggingEvent event) {
String message = (String) event.getMessage();
BufferedReader br = null; if (message.contains(toMatch) && event.getLevel().equals(Level.WARN)) {
boolean foundUsage = false; foundMessage.countDown();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(file), Charset.forName("UTF-8")));
String line = null;
while ((line = br.readLine()) != null) {
if (line.contains(new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)))) && line.contains(limitsLogLevel.toUpperCase())) {
foundUsage = true;
} }
} }
} catch (Exception e) { };
fail(e.getMessage());
} finally {
br.close();
}
if (!foundUsage) Logger.getRootLogger().addAppender(appender);
fail("checkLimitsLogLevel message did not write to log target/activemq-test.log"); BrokerService brokerService = createBroker();
brokerService.start();
brokerService.stop();
assertTrue("Fount log message", foundMessage.await(WAIT_TIME_MILLS, TimeUnit.MILLISECONDS));
Logger.getRootLogger().removeAppender(appender);
}
@Test
public void testCheckLimitsFailStart() throws Exception {
final CountDownLatch foundMessage = new CountDownLatch(1);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
String message = (String) event.getMessage();
if (message.contains(toMatch) && event.getLevel().equals(Level.ERROR)) {
foundMessage.countDown();
}
}
};
Logger.getRootLogger().addAppender(appender);
BrokerService brokerService = createBroker();
brokerService.setAdjustUsageLimits(false);
try {
brokerService.start();
fail("expect ConfigurationException");
} catch (ConfigurationException expected) {
assertTrue("exception message match", expected.getLocalizedMessage().contains(toMatch));
}
brokerService.stop();
assertTrue("Fount log message", foundMessage.await(WAIT_TIME_MILLS, TimeUnit.MILLISECONDS));
Logger.getRootLogger().removeAppender(appender);
} }
} }