From 2be754583cc68c99f4feb7f2e880037c38921172 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 10 Dec 2015 11:23:56 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6084 - add broker.adjustUsageLimits to disable the adjustment of limits to what is available. In this way, a broker will fail to start if constrained, ensuring it won't accept connections and block pending resources if it has earlier exited for that reason (cherry picked from commit d7febddb6789171ccfd22faf0ee4f00f3e9c9490) --- .../apache/activemq/broker/BrokerService.java | 63 ++++++++---- .../usage/PercentDiskUsageLimitTest.java | 18 ++++ .../activemq/usage/StoreUsageLimitsTest.java | 99 ++++++++++++------- 3 files changed, 127 insertions(+), 53 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 806fa5a675..0c582a4795 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -243,6 +243,7 @@ public class BrokerService implements Service { private int maxPurgedDestinationsPerSweep = 0; private int schedulePeriodForDiskUsageCheck = 0; private int diskUsageCheckRegrowThreshold = -1; + private boolean adjustUsageLimits = true; private BrokerContext brokerContext; private boolean networkConnectorStartAsync = false; private boolean allowTempAutoCreationOnSend; @@ -591,6 +592,7 @@ public class BrokerService implements Service { MDC.put("activemq.broker", brokerName); try { + checkMemorySystemUsageLimits(); if (systemExitOnShutdown && useShutdownHook) { throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)"); } @@ -740,7 +742,7 @@ public class BrokerService implements Service { LOG.info("For help or more information please see: http://activemq.apache.org"); getBroker().brokerServiceStarted(); - checkSystemUsageLimits(); + checkStoreSystemUsageLimits(); startedLatch.countDown(); getBroker().nowMasterBroker(); } @@ -1973,7 +1975,7 @@ public class BrokerService implements Service { * Check that the store usage limit is not greater than max usable * space and adjust if it is */ - protected void checkStoreUsageLimits() throws IOException { + protected void checkStoreUsageLimits() throws Exception { final SystemUsage usage = getSystemUsage(); if (getPersistenceAdapter() != null) { @@ -2001,7 +2003,7 @@ public class BrokerService implements Service { * Check that temporary usage limit is not greater than max usable * space and adjust if it is */ - protected void checkTmpStoreUsageLimits() throws IOException { + protected void checkTmpStoreUsageLimits() throws Exception { final SystemUsage usage = getSystemUsage(); File tmpDir = getTmpDataDirectory(); @@ -2030,7 +2032,7 @@ public class BrokerService implements Service { } } - protected void checkUsageLimit(File dir, Usage storeUsage, int percentLimit) { + protected void checkUsageLimit(File dir, Usage storeUsage, int percentLimit) throws ConfigurationException { if (dir != null) { dir = StoreUtil.findParentDirectory(dir); String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store"; @@ -2064,6 +2066,17 @@ public class BrokerService implements Service { //check if the limit is too large for the amount of usable space } else if (storeLimit > totalUsableSpace) { + final String message = storeName + " limit is " + storeLimit / oneMeg + + " mb (current store usage is " + storeCurrent / oneMeg + + " mb). The data directory: " + dir.getAbsolutePath() + + " only has " + totalUsableSpace / oneMeg + + " mb of usable space."; + + if (!isAdjustUsageLimits()) { + LOG.error(message); + throw new ConfigurationException(message); + } + if (percentLimit > 0) { LOG.warn(storeName + " limit has been set to " + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)" @@ -2072,14 +2085,10 @@ public class BrokerService implements Service { + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)" + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)" + " is available - resetting limit"); + } else { + LOG.warn(message + " - resetting to maximum available disk space: " + + totalUsableSpace / oneMeg + " mb"); } - - LOG.warn(storeName + " limit is " + storeLimit / oneMeg + - " mb (current store usage is " + storeCurrent / oneMeg + - " mb). The data directory: " + dir.getAbsolutePath() + - " only has " + totalUsableSpace / oneMeg + - " mb of usable space - resetting to maximum available disk space: " + - totalUsableSpace / oneMeg + " mb"); storeUsage.setLimit(totalUsableSpace); } } @@ -2098,13 +2107,13 @@ public class BrokerService implements Service { public void run() { try { checkStoreUsageLimits(); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed to check persistent disk usage limits", e); } try { checkTmpStoreUsageLimits(); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed to check temporary store usage limits", e); } } @@ -2113,17 +2122,27 @@ public class BrokerService implements Service { } } - protected void checkSystemUsageLimits() throws IOException { + protected void checkMemorySystemUsageLimits() throws Exception { final SystemUsage usage = getSystemUsage(); long memLimit = usage.getMemoryUsage().getLimit(); long jvmLimit = Runtime.getRuntime().maxMemory(); if (memLimit > jvmLimit) { - usage.getMemoryUsage().setPercentOfJvmHeap(70); - LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) + - " mb) is more than the maximum available for the JVM: " + - jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); + final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024) + + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024); + + if (adjustUsageLimits) { + usage.getMemoryUsage().setPercentOfJvmHeap(70); + LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb"); + } else { + LOG.error(message); + throw new ConfigurationException(message); + } } + } + + protected void checkStoreSystemUsageLimits() throws Exception { + final SystemUsage usage = getSystemUsage(); //Check the persistent store and temp store limits if they exist //and schedule a periodic check to update disk limits if @@ -3168,4 +3187,12 @@ public class BrokerService implements Service { boolean useVirtualDestSubsOnCreation) { this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; } + + public boolean isAdjustUsageLimits() { + return adjustUsageLimits; + } + + public void setAdjustUsageLimits(boolean adjustUsageLimits) { + this.adjustUsageLimits = adjustUsageLimits; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java index 5eb830cf5f..ad2a89b874 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/PercentDiskUsageLimitTest.java @@ -17,9 +17,12 @@ package org.apache.activemq.usage; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import java.io.File; +import org.apache.activemq.ConfigurationException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.util.StoreUtil; @@ -120,6 +123,21 @@ public class PercentDiskUsageLimitTest { } } + @Test(timeout=30000) + public void testStartFailDiskLimitOverMaxFree() throws Exception { + broker.setAdjustUsageLimits(false); + int freePercent = getFreePercentage(); + + if (freePercent > 1) { + storeUsage.setPercentLimit(freePercent + 1); + + try { + startBroker(); + fail("Expect ex"); + } catch (ConfigurationException expected) {} + } + } + @Test(timeout=30000) public void testDiskLimitOver100Percent() throws Exception { int freePercent = getFreePercentage(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java index 80bb0a389e..22d5fd0010 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageLimitsTest.java @@ -17,59 +17,88 @@ package org.apache.activemq.usage; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import java.nio.charset.Charset; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.ConfigurationException; +import org.junit.Test; -import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; -public class StoreUsageLimitsTest extends EmbeddedBrokerTestSupport { + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StoreUsageLimitsTest { final int WAIT_TIME_MILLS = 20 * 1000; private static final String limitsLogLevel = "warn"; + final String toMatch = new String(Long.toString(Long.MAX_VALUE / (1024 * 1024))); - @Override protected BrokerService createBroker() throws Exception { - BrokerService broker = super.createBroker(); + BrokerService broker = new BrokerService(); + broker.setPersistent(false); broker.getSystemUsage().getMemoryUsage().setLimit(Long.MAX_VALUE); broker.getSystemUsage().setCheckLimitsLogLevel(limitsLogLevel); broker.deleteAllMessages(); return broker; } - @Override - protected boolean isPersistent() { - return true; - } - + @Test public void testCheckLimitsLogLevel() throws Exception { - File file = new File("target/activemq-test.log"); - if (!file.exists()) { - fail("target/activemq-test.log was not created."); - } - - BufferedReader br = null; - boolean foundUsage = false; - - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(file), Charset.forName("UTF-8"))); - String line = null; - while ((line = br.readLine()) != null) { - if (line.contains(new String(Long.toString(Long.MAX_VALUE / (1024 * 1024)))) && line.contains(limitsLogLevel.toUpperCase())) { - foundUsage = true; + final CountDownLatch foundMessage = new CountDownLatch(1); + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + String message = (String) event.getMessage(); + if (message.contains(toMatch) && event.getLevel().equals(Level.WARN)) { + foundMessage.countDown(); } } - } catch (Exception e) { - fail(e.getMessage()); - } finally { - br.close(); - } + }; - if (!foundUsage) - fail("checkLimitsLogLevel message did not write to log target/activemq-test.log"); + Logger.getRootLogger().addAppender(appender); + BrokerService brokerService = createBroker(); + brokerService.start(); + brokerService.stop(); + + assertTrue("Fount log message", foundMessage.await(WAIT_TIME_MILLS, TimeUnit.MILLISECONDS)); + + Logger.getRootLogger().removeAppender(appender); + } + + @Test + public void testCheckLimitsFailStart() throws Exception { + + final CountDownLatch foundMessage = new CountDownLatch(1); + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + String message = (String) event.getMessage(); + if (message.contains(toMatch) && event.getLevel().equals(Level.ERROR)) { + foundMessage.countDown(); + } + } + }; + + Logger.getRootLogger().addAppender(appender); + BrokerService brokerService = createBroker(); + brokerService.setAdjustUsageLimits(false); + try { + brokerService.start(); + fail("expect ConfigurationException"); + } catch (ConfigurationException expected) { + assertTrue("exception message match", expected.getLocalizedMessage().contains(toMatch)); + } + brokerService.stop(); + + assertTrue("Fount log message", foundMessage.await(WAIT_TIME_MILLS, TimeUnit.MILLISECONDS)); + + Logger.getRootLogger().removeAppender(appender); } }