mirror of https://github.com/apache/activemq.git
Adding a property called schedulePeriodForDiskUsageCheck which can be set to a time period to periodically check disk usage limits and adjust if the amount of disk space has been reduced.
This commit is contained in:
parent
f45ce09655
commit
61fd811adc
|
@ -229,6 +229,7 @@ public class BrokerService implements Service {
|
|||
private ThreadPoolExecutor executor;
|
||||
private int schedulePeriodForDestinationPurge= 0;
|
||||
private int maxPurgedDestinationsPerSweep = 0;
|
||||
private int schedulePeriodForDiskUsageCheck = 0;
|
||||
private BrokerContext brokerContext;
|
||||
private boolean networkConnectorStartAsync = false;
|
||||
private boolean allowTempAutoCreationOnSend;
|
||||
|
@ -1953,17 +1954,12 @@ public class BrokerService implements Service {
|
|||
}
|
||||
}
|
||||
|
||||
protected void checkSystemUsageLimits() throws IOException {
|
||||
SystemUsage usage = getSystemUsage();
|
||||
long memLimit = usage.getMemoryUsage().getLimit();
|
||||
long jvmLimit = Runtime.getRuntime().maxMemory();
|
||||
|
||||
if (memLimit > jvmLimit) {
|
||||
usage.getMemoryUsage().setPercentOfJvmHeap(70);
|
||||
LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
|
||||
" mb) is more than the maximum available for the JVM: " +
|
||||
jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
|
||||
}
|
||||
/**
|
||||
* Check that the store usage limit is not greater than max usable
|
||||
* space and adjust if it is
|
||||
*/
|
||||
protected void checkStoreUsageLimits() throws IOException {
|
||||
final SystemUsage usage = getSystemUsage();
|
||||
|
||||
if (getPersistenceAdapter() != null) {
|
||||
PersistenceAdapter adapter = getPersistenceAdapter();
|
||||
|
@ -2007,6 +2003,14 @@ public class BrokerService implements Service {
|
|||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that temporary usage limit is not greater than max usable
|
||||
* space and adjust if it is
|
||||
*/
|
||||
protected void checkTmpStoreUsageLimits() throws IOException {
|
||||
final SystemUsage usage = getSystemUsage();
|
||||
|
||||
File tmpDir = getTmpDataDirectory();
|
||||
if (tmpDir != null) {
|
||||
|
@ -2047,6 +2051,54 @@ public class BrokerService implements Service {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
|
||||
* update store and temporary store limits if the amount of available space
|
||||
* plus current store size is less than the existin configured limit
|
||||
*/
|
||||
protected void scheduleDiskUsageLimitsCheck() throws IOException {
|
||||
if (schedulePeriodForDiskUsageCheck > 0 &&
|
||||
(getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
|
||||
Runnable diskLimitCheckTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
checkStoreUsageLimits();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to check persistent disk usage limits", e);
|
||||
}
|
||||
|
||||
try {
|
||||
checkTmpStoreUsageLimits();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to check temporary store usage limits", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
|
||||
}
|
||||
}
|
||||
|
||||
protected void checkSystemUsageLimits() throws IOException {
|
||||
final SystemUsage usage = getSystemUsage();
|
||||
long memLimit = usage.getMemoryUsage().getLimit();
|
||||
long jvmLimit = Runtime.getRuntime().maxMemory();
|
||||
|
||||
if (memLimit > jvmLimit) {
|
||||
usage.getMemoryUsage().setPercentOfJvmHeap(70);
|
||||
LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
|
||||
" mb) is more than the maximum available for the JVM: " +
|
||||
jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
|
||||
}
|
||||
|
||||
//Check the persistent store and temp store limits if they exist
|
||||
//and schedule a periodic check to update disk limits if
|
||||
//schedulePeriodForDiskLimitCheck is set
|
||||
checkStoreUsageLimits();
|
||||
checkTmpStoreUsageLimits();
|
||||
scheduleDiskUsageLimitsCheck();
|
||||
|
||||
if (getJobSchedulerStore() != null) {
|
||||
JobSchedulerStore scheduler = getJobSchedulerStore();
|
||||
|
@ -2837,6 +2889,11 @@ public class BrokerService implements Service {
|
|||
this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
|
||||
}
|
||||
|
||||
public void setSchedulePeriodForDiskUsageCheck(
|
||||
int schedulePeriodForDiskUsageCheck) {
|
||||
this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck;
|
||||
}
|
||||
|
||||
public int getMaxPurgedDestinationsPerSweep() {
|
||||
return this.maxPurgedDestinationsPerSweep;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usage;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.usage.StoreUsage;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.usage.TempUsage;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* This test is for AMQ-5393 and will check that schedulePeriodForDiskLimitCheck
|
||||
* properly schedules a task that will update disk limits if the amount of usable disk space drops
|
||||
* because another process uses up disk space.
|
||||
*
|
||||
*/
|
||||
public class PeriodicDiskUsageLimitTest {
|
||||
protected static final Logger LOG = LoggerFactory
|
||||
.getLogger(PeriodicDiskUsageLimitTest.class);
|
||||
|
||||
File dataFileDir = new File("target/test-amq-5393/datadb");
|
||||
File testfile = new File("target/test-amq-5393/testfile");
|
||||
private BrokerService broker;
|
||||
private PersistenceAdapter adapter;
|
||||
private TempUsage tempUsage;
|
||||
private StoreUsage storeUsage;
|
||||
|
||||
@Before
|
||||
public void setUpBroker() throws Exception {
|
||||
broker = new BrokerService();
|
||||
broker.setPersistent(true);
|
||||
broker.setDataDirectoryFile(dataFileDir);
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
adapter = broker.getPersistenceAdapter();
|
||||
|
||||
FileUtils.deleteQuietly(testfile);
|
||||
FileUtils.forceMkdir(adapter.getDirectory());
|
||||
FileUtils.forceMkdir(broker.getTempDataStore().getDirectory());
|
||||
|
||||
final SystemUsage systemUsage = broker.getSystemUsage();
|
||||
tempUsage = systemUsage.getTempUsage();
|
||||
storeUsage = systemUsage.getStoreUsage();
|
||||
}
|
||||
|
||||
protected void startBroker() throws Exception {
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
FileUtils.deleteQuietly(testfile);
|
||||
FileUtils.deleteQuietly(dataFileDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test will show that if a file is written to take away free space, and
|
||||
* if the usage limit is now less than the store size plus remaining free space, then
|
||||
* the usage limits will adjust lower.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testDiskUsageAdjustLower() throws Exception {
|
||||
//set the limit to max space so that if a file is added to eat up free space then
|
||||
//the broker should adjust the usage limit..set time to 5 seconds for testing
|
||||
setLimitMaxSpace();
|
||||
broker.setSchedulePeriodForDiskUsageCheck(4000);
|
||||
startBroker();
|
||||
|
||||
final long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit();
|
||||
final long originalTmp = broker.getSystemUsage().getTempUsage().getLimit();
|
||||
|
||||
//write a 1 meg file to the file system
|
||||
writeTestFile(1024 * 1024);
|
||||
|
||||
//Assert that the usage limits have been decreased because some free space was used
|
||||
//up by a file
|
||||
assertTrue("Store Usage should ramp down.", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return broker.getSystemUsage().getStoreUsage().getLimit() < originalDisk;
|
||||
}
|
||||
}));
|
||||
|
||||
assertTrue("Temp Usage should ramp down.", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return broker.getSystemUsage().getTempUsage().getLimit() < originalTmp;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test shows that the usage limits will not change if the
|
||||
* schedulePeriodForDiskLimitCheck property is not set because no task will run
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testDiskLimitCheckNotSet() throws Exception {
|
||||
setLimitMaxSpace();
|
||||
startBroker();
|
||||
|
||||
long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit();
|
||||
long originalTmp = broker.getSystemUsage().getTempUsage().getLimit();
|
||||
|
||||
//write a 1 meg file to the file system
|
||||
writeTestFile(1024 * 1024);
|
||||
Thread.sleep(3000);
|
||||
|
||||
//assert that the usage limits have not changed because a task should not have run
|
||||
assertEquals(originalDisk, broker.getSystemUsage().getStoreUsage().getLimit());
|
||||
assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test will show that if a file is written to take away free space, but
|
||||
* if the limit is greater than the store size and the remaining free space, then
|
||||
* the usage limits will not adjust.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testDiskUsageStaySame() throws Exception {
|
||||
//set a limit lower than max available space and set the period to 5 seconds
|
||||
tempUsage.setLimit(10000000);
|
||||
storeUsage.setLimit(100000000);
|
||||
broker.setSchedulePeriodForDiskUsageCheck(2000);
|
||||
startBroker();
|
||||
|
||||
long originalDisk = broker.getSystemUsage().getStoreUsage().getLimit();
|
||||
long originalTmp = broker.getSystemUsage().getTempUsage().getLimit();
|
||||
|
||||
//write a 1 meg file to the file system
|
||||
writeTestFile(1024 * 1024);
|
||||
Thread.sleep(5000);
|
||||
|
||||
//Assert that the usage limits have not changed because writing a 1 meg file
|
||||
//did not decrease the the free space below the already set limit
|
||||
assertEquals(originalDisk, broker.getSystemUsage().getStoreUsage().getLimit());
|
||||
assertEquals(originalTmp, broker.getSystemUsage().getTempUsage().getLimit());
|
||||
}
|
||||
|
||||
protected void setLimitMaxSpace() {
|
||||
//Configure store limits to be the max usable space on startup
|
||||
tempUsage.setLimit(broker.getTempDataStore().getDirectory().getUsableSpace());
|
||||
storeUsage.setLimit(adapter.getDirectory().getUsableSpace());
|
||||
}
|
||||
|
||||
protected void writeTestFile(int size) throws IOException {
|
||||
final byte[] data = new byte[size];
|
||||
final Random rng = new Random();
|
||||
rng.nextBytes(data);
|
||||
IOUtils.write(data, new FileOutputStream(testfile));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue