HADOOP-12975. Add jitter to CachingGetSpaceUsed's thread (Elliott Clark via Colin P. McCabe).
HADOOP-14408. Backport contributed by Erik Krogen. (cherry picked from commitbf780406f2
) (cherry picked from commit56c997a1a6
) (cherry picked from commit04682cc6b0
)
This commit is contained in:
parent
f052976ff1
commit
834d4217b7
|
@ -32,6 +32,10 @@ Release 2.7.4 - UNRELEASED
|
|||
HADOOP-14276. Add a nanosecond API to Time/Timer/FakeTimer.
|
||||
(Erik Krogen via zhz)
|
||||
|
||||
HADOOP-12975. Add jitter to CachingGetSpaceUsed's thread
|
||||
(Elliott Clark via Colin P. McCabe).
|
||||
HADOOP-14408. Backport contributed by Erik Krogen.
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-14138. Remove S3A ref from META-INF service discovery, rely on
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
|
@ -32,6 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Evolving
|
||||
public class DU extends Shell {
|
||||
static final String JITTER_KEY = "fs.getspaceused.jitterMillis";
|
||||
static final long DEFAULT_JITTER = TimeUnit.MINUTES.toMillis(1);
|
||||
|
||||
private String dirPath;
|
||||
|
||||
private AtomicLong used = new AtomicLong();
|
||||
|
@ -39,7 +43,8 @@ public class DU extends Shell {
|
|||
private Thread refreshUsed;
|
||||
private IOException duException = null;
|
||||
private long refreshInterval;
|
||||
|
||||
private final long jitter;
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
* @param path the path to check disk usage in
|
||||
|
@ -47,18 +52,23 @@ public class DU extends Shell {
|
|||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
public DU(File path, long interval) throws IOException {
|
||||
this(path, interval, -1L);
|
||||
this(path, interval, 0L, -1L);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
* @param path the path to check disk usage in
|
||||
* @param interval refresh the disk usage at this interval
|
||||
* @param jitter randomize the refresh interval timing by this amount; the
|
||||
* actual interval will be randomly chosen between
|
||||
* {@code interval-jitter} and {@code interval+jitter}
|
||||
* @param initialUsed use this value until next refresh
|
||||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
public DU(File path, long interval, long initialUsed) throws IOException {
|
||||
public DU(File path, long interval, long jitter, long initialUsed)
|
||||
throws IOException {
|
||||
super(0);
|
||||
this.jitter = jitter;
|
||||
|
||||
//we set the Shell interval to 0 so it will always run our command
|
||||
//and use this one to set the thread sleep interval
|
||||
|
@ -93,7 +103,8 @@ public class DU extends Shell {
|
|||
public DU(File path, Configuration conf, long initialUsed)
|
||||
throws IOException {
|
||||
this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY,
|
||||
CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed);
|
||||
CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT),
|
||||
conf.getLong(JITTER_KEY, DEFAULT_JITTER), initialUsed);
|
||||
}
|
||||
|
||||
|
||||
|
@ -112,7 +123,13 @@ public class DU extends Shell {
|
|||
while(shouldRun) {
|
||||
|
||||
try {
|
||||
Thread.sleep(refreshInterval);
|
||||
long thisRefreshInterval = refreshInterval;
|
||||
if (jitter > 0) {
|
||||
// add/subtract the jitter.
|
||||
thisRefreshInterval +=
|
||||
ThreadLocalRandom.current().nextLong(-jitter, jitter);
|
||||
}
|
||||
Thread.sleep(thisRefreshInterval);
|
||||
|
||||
try {
|
||||
//update the used variable
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.io.RandomAccessFile;
|
|||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
|
||||
/** This test makes sure that "DU" does not get to run on each call to getUsed */
|
||||
public class TestDU extends TestCase {
|
||||
|
@ -78,8 +77,8 @@ public class TestDU extends TestCase {
|
|||
createFile(file, writtenSize);
|
||||
|
||||
Thread.sleep(5000); // let the metadata updater catch up
|
||||
|
||||
DU du = new DU(file, 10000);
|
||||
|
||||
DU du = new DU(file, 10000, 0, -1);
|
||||
du.start();
|
||||
long duSize = du.getUsed();
|
||||
du.shutdown();
|
||||
|
@ -89,7 +88,7 @@ public class TestDU extends TestCase {
|
|||
writtenSize <= (duSize + slack));
|
||||
|
||||
//test with 0 interval, will not launch thread
|
||||
du = new DU(file, 0);
|
||||
du = new DU(file, 0, 1, -1);
|
||||
du.start();
|
||||
duSize = du.getUsed();
|
||||
du.shutdown();
|
||||
|
@ -99,7 +98,7 @@ public class TestDU extends TestCase {
|
|||
writtenSize <= (duSize + slack));
|
||||
|
||||
//test without launching thread
|
||||
du = new DU(file, 10000);
|
||||
du = new DU(file, 10000, 0, -1);
|
||||
duSize = du.getUsed();
|
||||
|
||||
assertTrue("Invalid on-disk size",
|
||||
|
@ -111,7 +110,7 @@ public class TestDU extends TestCase {
|
|||
assertTrue(file.createNewFile());
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
|
||||
DU du = new DU(file, conf);
|
||||
DU du = new DU(file, 10000L, 0, -1);
|
||||
du.decDfsUsed(Long.MAX_VALUE);
|
||||
long duSize = du.getUsed();
|
||||
assertTrue(String.valueOf(duSize), duSize >= 0L);
|
||||
|
@ -120,7 +119,7 @@ public class TestDU extends TestCase {
|
|||
public void testDUSetInitialValue() throws IOException {
|
||||
File file = new File(DU_DIR, "dataX");
|
||||
createFile(file, 8192);
|
||||
DU du = new DU(file, 3000, 1024);
|
||||
DU du = new DU(file, 3000, 0, 1024);
|
||||
du.start();
|
||||
assertTrue("Initial usage setting not honored", du.getUsed() == 1024);
|
||||
|
||||
|
|
Loading…
Reference in New Issue