HADOOP-12975. Add jitter to CachingGetSpaceUsed's thread (Elliott Clark via Colin P. McCabe)
(cherry picked from commitbf780406f2
) (cherry picked from commit56c997a1a6
)
This commit is contained in:
parent
b07cd9c6a9
commit
04682cc6b0
|
@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -43,6 +44,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
protected final AtomicLong used = new AtomicLong();
|
protected final AtomicLong used = new AtomicLong();
|
||||||
private final AtomicBoolean running = new AtomicBoolean(true);
|
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||||
private final long refreshInterval;
|
private final long refreshInterval;
|
||||||
|
private final long jitter;
|
||||||
private final String dirPath;
|
private final String dirPath;
|
||||||
private Thread refreshUsed;
|
private Thread refreshUsed;
|
||||||
|
|
||||||
|
@ -52,7 +54,10 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
*/
|
*/
|
||||||
public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
|
public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
|
this(builder.getPath(),
|
||||||
|
builder.getInterval(),
|
||||||
|
builder.getJitter(),
|
||||||
|
builder.getInitialUsed());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,10 +70,12 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
*/
|
*/
|
||||||
CachingGetSpaceUsed(File path,
|
CachingGetSpaceUsed(File path,
|
||||||
long interval,
|
long interval,
|
||||||
|
long jitter,
|
||||||
long initialUsed) throws IOException {
|
long initialUsed) throws IOException {
|
||||||
dirPath = path.getCanonicalPath();
|
this.dirPath = path.getCanonicalPath();
|
||||||
refreshInterval = interval;
|
this.refreshInterval = interval;
|
||||||
used.set(initialUsed);
|
this.jitter = jitter;
|
||||||
|
this.used.set(initialUsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
void init() {
|
void init() {
|
||||||
|
@ -155,7 +162,18 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
public void run() {
|
public void run() {
|
||||||
while (spaceUsed.running()) {
|
while (spaceUsed.running()) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(spaceUsed.getRefreshInterval());
|
long refreshInterval = spaceUsed.refreshInterval;
|
||||||
|
|
||||||
|
if (spaceUsed.jitter > 0) {
|
||||||
|
long jitter = spaceUsed.jitter;
|
||||||
|
// add/subtract the jitter.
|
||||||
|
refreshInterval +=
|
||||||
|
ThreadLocalRandom.current()
|
||||||
|
.nextLong(-jitter, jitter);
|
||||||
|
}
|
||||||
|
// Make sure that after the jitter we didn't end up at 0.
|
||||||
|
refreshInterval = Math.max(refreshInterval, 1);
|
||||||
|
Thread.sleep(refreshInterval);
|
||||||
// update the used variable
|
// update the used variable
|
||||||
spaceUsed.refresh();
|
spaceUsed.refresh();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -34,12 +34,16 @@ public class DU extends CachingGetSpaceUsed {
|
||||||
private DUShell duShell;
|
private DUShell duShell;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public DU(File path, long interval, long initialUsed) throws IOException {
|
public DU(File path, long interval, long jitter, long initialUsed)
|
||||||
super(path, interval, initialUsed);
|
throws IOException {
|
||||||
|
super(path, interval, jitter, initialUsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DU(CachingGetSpaceUsed.Builder builder) throws IOException {
|
public DU(CachingGetSpaceUsed.Builder builder) throws IOException {
|
||||||
this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
|
this(builder.getPath(),
|
||||||
|
builder.getInterval(),
|
||||||
|
builder.getJitter(),
|
||||||
|
builder.getInitialUsed());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -26,8 +26,11 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public interface GetSpaceUsed {
|
public interface GetSpaceUsed {
|
||||||
|
|
||||||
|
|
||||||
long getUsed() throws IOException;
|
long getUsed() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -37,11 +40,15 @@ public interface GetSpaceUsed {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
|
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
|
||||||
|
|
||||||
static final String CLASSNAME_KEY = "fs.getspaceused.classname";
|
static final String CLASSNAME_KEY = "fs.getspaceused.classname";
|
||||||
|
static final String JITTER_KEY = "fs.getspaceused.jitterMillis";
|
||||||
|
static final long DEFAULT_JITTER = TimeUnit.MINUTES.toMillis(1);
|
||||||
|
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private Class<? extends GetSpaceUsed> klass = null;
|
private Class<? extends GetSpaceUsed> klass = null;
|
||||||
private File path = null;
|
private File path = null;
|
||||||
private Long interval = null;
|
private Long interval = null;
|
||||||
|
private Long jitter = null;
|
||||||
private Long initialUsed = null;
|
private Long initialUsed = null;
|
||||||
|
|
||||||
public Configuration getConf() {
|
public Configuration getConf() {
|
||||||
|
@ -111,6 +118,24 @@ public interface GetSpaceUsed {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public long getJitter() {
|
||||||
|
if (jitter == null) {
|
||||||
|
Configuration configuration = this.conf;
|
||||||
|
|
||||||
|
if (configuration == null) {
|
||||||
|
return DEFAULT_JITTER;
|
||||||
|
}
|
||||||
|
return configuration.getLong(JITTER_KEY, DEFAULT_JITTER);
|
||||||
|
}
|
||||||
|
return jitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setJitter(Long jit) {
|
||||||
|
this.jitter = jit;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public GetSpaceUsed build() throws IOException {
|
public GetSpaceUsed build() throws IOException {
|
||||||
GetSpaceUsed getSpaceUsed = null;
|
GetSpaceUsed getSpaceUsed = null;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -31,10 +31,11 @@ import java.io.IOException;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
|
public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
|
||||||
|
|
||||||
|
WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException {
|
||||||
public WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
|
super(builder.getPath(),
|
||||||
throws IOException {
|
builder.getInterval(),
|
||||||
super(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
|
builder.getJitter(),
|
||||||
|
builder.getInitialUsed());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class TestDU extends TestCase {
|
||||||
|
|
||||||
Thread.sleep(5000); // let the metadata updater catch up
|
Thread.sleep(5000); // let the metadata updater catch up
|
||||||
|
|
||||||
DU du = new DU(file, 10000, -1);
|
DU du = new DU(file, 10000, 0, -1);
|
||||||
du.init();
|
du.init();
|
||||||
long duSize = du.getUsed();
|
long duSize = du.getUsed();
|
||||||
du.close();
|
du.close();
|
||||||
|
@ -89,7 +89,7 @@ public class TestDU extends TestCase {
|
||||||
writtenSize <= (duSize + slack));
|
writtenSize <= (duSize + slack));
|
||||||
|
|
||||||
//test with 0 interval, will not launch thread
|
//test with 0 interval, will not launch thread
|
||||||
du = new DU(file, 0, -1);
|
du = new DU(file, 0, 1, -1);
|
||||||
du.init();
|
du.init();
|
||||||
duSize = du.getUsed();
|
duSize = du.getUsed();
|
||||||
du.close();
|
du.close();
|
||||||
|
@ -99,7 +99,7 @@ public class TestDU extends TestCase {
|
||||||
writtenSize <= (duSize + slack));
|
writtenSize <= (duSize + slack));
|
||||||
|
|
||||||
//test without launching thread
|
//test without launching thread
|
||||||
du = new DU(file, 10000, -1);
|
du = new DU(file, 10000, 0, -1);
|
||||||
du.init();
|
du.init();
|
||||||
duSize = du.getUsed();
|
duSize = du.getUsed();
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ public class TestDU extends TestCase {
|
||||||
assertTrue(file.createNewFile());
|
assertTrue(file.createNewFile());
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
|
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
|
||||||
DU du = new DU(file, 10000L, -1);
|
DU du = new DU(file, 10000L, 0, -1);
|
||||||
du.incDfsUsed(-Long.MAX_VALUE);
|
du.incDfsUsed(-Long.MAX_VALUE);
|
||||||
long duSize = du.getUsed();
|
long duSize = du.getUsed();
|
||||||
assertTrue(String.valueOf(duSize), duSize >= 0L);
|
assertTrue(String.valueOf(duSize), duSize >= 0L);
|
||||||
|
@ -121,7 +121,7 @@ public class TestDU extends TestCase {
|
||||||
public void testDUSetInitialValue() throws IOException {
|
public void testDUSetInitialValue() throws IOException {
|
||||||
File file = new File(DU_DIR, "dataX");
|
File file = new File(DU_DIR, "dataX");
|
||||||
createFile(file, 8192);
|
createFile(file, 8192);
|
||||||
DU du = new DU(file, 3000, 1024);
|
DU du = new DU(file, 3000, 0, 1024);
|
||||||
du.init();
|
du.init();
|
||||||
assertTrue("Initial usage setting not honored", du.getUsed() == 1024);
|
assertTrue("Initial usage setting not honored", du.getUsed() == 1024);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue