HADOOP-12973. Make DU pluggable. (Elliott Clark via cmccabe)
This commit is contained in:
parent
042a3ae960
commit
35f0770555
|
@ -0,0 +1,168 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Interface for class that can tell estimate much space
|
||||
* is used in a directory.
|
||||
* <p>
|
||||
* The implementor is fee to cache space used. As such there
|
||||
* are methods to update the cached value with any known changes.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||
static final Logger LOG = LoggerFactory.getLogger(CachingGetSpaceUsed.class);
|
||||
|
||||
protected final AtomicLong used = new AtomicLong();
|
||||
private final AtomicBoolean running = new AtomicBoolean(true);
|
||||
private final long refreshInterval;
|
||||
private final String dirPath;
|
||||
private Thread refreshUsed;
|
||||
|
||||
/**
|
||||
* This is the constructor used by the builder.
|
||||
* All overriding classes should implement this.
|
||||
*/
|
||||
public CachingGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
|
||||
throws IOException {
|
||||
this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
|
||||
}
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
*
|
||||
* @param path the path to check disk usage in
|
||||
* @param interval refresh the disk usage at this interval
|
||||
* @param initialUsed use this value until next refresh
|
||||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
CachingGetSpaceUsed(File path,
|
||||
long interval,
|
||||
long initialUsed) throws IOException {
|
||||
dirPath = path.getCanonicalPath();
|
||||
refreshInterval = interval;
|
||||
used.set(initialUsed);
|
||||
}
|
||||
|
||||
void init() {
|
||||
if (used.get() < 0) {
|
||||
used.set(0);
|
||||
refresh();
|
||||
}
|
||||
|
||||
if (refreshInterval > 0) {
|
||||
refreshUsed = new Thread(new RefreshThread(this),
|
||||
"refreshUsed-" + dirPath);
|
||||
refreshUsed.setDaemon(true);
|
||||
refreshUsed.start();
|
||||
} else {
|
||||
running.set(false);
|
||||
refreshUsed = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void refresh();
|
||||
|
||||
/**
|
||||
* @return an estimate of space used in the directory path.
|
||||
*/
|
||||
@Override public long getUsed() throws IOException {
|
||||
return Math.max(used.get(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The directory path being monitored.
|
||||
*/
|
||||
public String getDirPath() {
|
||||
return dirPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the cached value of used space.
|
||||
*/
|
||||
public void incDfsUsed(long value) {
|
||||
used.addAndGet(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the background thread running.
|
||||
*/
|
||||
boolean running() {
|
||||
return running.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* How long in between runs of the background refresh.
|
||||
*/
|
||||
long getRefreshInterval() {
|
||||
return refreshInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the current used data amount. This should be called
|
||||
* when the cached value is re-computed.
|
||||
*
|
||||
* @param usedValue new value that should be the disk usage.
|
||||
*/
|
||||
protected void setUsed(long usedValue) {
|
||||
this.used.set(usedValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
running.set(false);
|
||||
if (refreshUsed != null) {
|
||||
refreshUsed.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class RefreshThread implements Runnable {
|
||||
|
||||
final CachingGetSpaceUsed spaceUsed;
|
||||
|
||||
RefreshThread(CachingGetSpaceUsed spaceUsed) {
|
||||
this.spaceUsed = spaceUsed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (spaceUsed.running()) {
|
||||
try {
|
||||
Thread.sleep(spaceUsed.getRefreshInterval());
|
||||
// update the used variable
|
||||
spaceUsed.refresh();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Thread Interrupted waiting to refresh disk information", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,227 +17,73 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/** Filesystem disk space usage statistics. Uses the unix 'du' program*/
|
||||
/** Filesystem disk space usage statistics. Uses the unix 'du' program */
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Evolving
|
||||
public class DU extends Shell {
|
||||
private String dirPath;
|
||||
public class DU extends CachingGetSpaceUsed {
|
||||
private DUShell duShell;
|
||||
|
||||
private AtomicLong used = new AtomicLong();
|
||||
private volatile boolean shouldRun = true;
|
||||
private Thread refreshUsed;
|
||||
private IOException duException = null;
|
||||
private long refreshInterval;
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
* @param path the path to check disk usage in
|
||||
* @param interval refresh the disk usage at this interval
|
||||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
public DU(File path, long interval) throws IOException {
|
||||
this(path, interval, -1L);
|
||||
@VisibleForTesting
|
||||
public DU(File path, long interval, long initialUsed) throws IOException {
|
||||
super(path, interval, initialUsed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
* @param path the path to check disk usage in
|
||||
* @param interval refresh the disk usage at this interval
|
||||
* @param initialUsed use this value until next refresh
|
||||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
public DU(File path, long interval, long initialUsed) throws IOException {
|
||||
super(0);
|
||||
|
||||
//we set the Shell interval to 0 so it will always run our command
|
||||
//and use this one to set the thread sleep interval
|
||||
this.refreshInterval = interval;
|
||||
this.dirPath = path.getCanonicalPath();
|
||||
public DU(CachingGetSpaceUsed.Builder builder) throws IOException {
|
||||
this(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
|
||||
}
|
||||
|
||||
//populate the used variable if the initial value is not specified.
|
||||
if (initialUsed < 0) {
|
||||
run();
|
||||
} else {
|
||||
this.used.set(initialUsed);
|
||||
@Override
|
||||
protected synchronized void refresh() {
|
||||
if (duShell == null) {
|
||||
duShell = new DUShell();
|
||||
}
|
||||
try {
|
||||
duShell.startRefresh();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Could not get disk usage information", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
* @param path the path to check disk usage in
|
||||
* @param conf configuration object
|
||||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
public DU(File path, Configuration conf) throws IOException {
|
||||
this(path, conf, -1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Keeps track of disk usage.
|
||||
* @param path the path to check disk usage in
|
||||
* @param conf configuration object
|
||||
* @param initialUsed use it until the next refresh.
|
||||
* @throws IOException if we fail to refresh the disk usage
|
||||
*/
|
||||
public DU(File path, Configuration conf, long initialUsed)
|
||||
throws IOException {
|
||||
this(path, conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY,
|
||||
CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT), initialUsed);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* This thread refreshes the "used" variable.
|
||||
*
|
||||
* Future improvements could be to not permanently
|
||||
* run this thread, instead run when getUsed is called.
|
||||
**/
|
||||
class DURefreshThread implements Runnable {
|
||||
|
||||
private final class DUShell extends Shell {
|
||||
void startRefresh() throws IOException {
|
||||
run();
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
while(shouldRun) {
|
||||
public String toString() {
|
||||
return
|
||||
"du -sk " + getDirPath() + "\n" + used.get() + "\t" + getDirPath();
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(refreshInterval);
|
||||
|
||||
try {
|
||||
//update the used variable
|
||||
DU.this.run();
|
||||
} catch (IOException e) {
|
||||
synchronized (DU.this) {
|
||||
//save the latest exception so we can return it in getUsed()
|
||||
duException = e;
|
||||
}
|
||||
|
||||
LOG.warn("Could not get disk usage information", e);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
@Override
|
||||
protected String[] getExecString() {
|
||||
return new String[]{"du", "-sk", getDirPath()};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void parseExecResult(BufferedReader lines) throws IOException {
|
||||
String line = lines.readLine();
|
||||
if (line == null) {
|
||||
throw new IOException("Expecting a line not the end of stream");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrease how much disk space we use.
|
||||
* @param value decrease by this value
|
||||
*/
|
||||
public void decDfsUsed(long value) {
|
||||
used.addAndGet(-value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase how much disk space we use.
|
||||
* @param value increase by this value
|
||||
*/
|
||||
public void incDfsUsed(long value) {
|
||||
used.addAndGet(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return disk space used
|
||||
* @throws IOException if the shell command fails
|
||||
*/
|
||||
public long getUsed() throws IOException {
|
||||
//if the updating thread isn't started, update on demand
|
||||
if(refreshUsed == null) {
|
||||
run();
|
||||
} else {
|
||||
synchronized (DU.this) {
|
||||
//if an exception was thrown in the last run, rethrow
|
||||
if(duException != null) {
|
||||
IOException tmp = duException;
|
||||
duException = null;
|
||||
throw tmp;
|
||||
}
|
||||
String[] tokens = line.split("\t");
|
||||
if (tokens.length == 0) {
|
||||
throw new IOException("Illegal du output");
|
||||
}
|
||||
setUsed(Long.parseLong(tokens[0]) * 1024);
|
||||
}
|
||||
|
||||
return Math.max(used.longValue(), 0L);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the path of which we're keeping track of disk usage
|
||||
*/
|
||||
public String getDirPath() {
|
||||
return dirPath;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Override to hook in DUHelper class. Maybe this can be used more
|
||||
* generally as well on Unix/Linux based systems
|
||||
*/
|
||||
@Override
|
||||
protected void run() throws IOException {
|
||||
if (WINDOWS) {
|
||||
used.set(DUHelper.getFolderUsage(dirPath));
|
||||
return;
|
||||
}
|
||||
super.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the disk usage checking thread.
|
||||
*/
|
||||
public void start() {
|
||||
//only start the thread if the interval is sane
|
||||
if(refreshInterval > 0) {
|
||||
refreshUsed = new Thread(new DURefreshThread(),
|
||||
"refreshUsed-"+dirPath);
|
||||
refreshUsed.setDaemon(true);
|
||||
refreshUsed.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the refreshing thread.
|
||||
*/
|
||||
public void shutdown() {
|
||||
this.shouldRun = false;
|
||||
|
||||
if(this.refreshUsed != null) {
|
||||
this.refreshUsed.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return
|
||||
"du -sk " + dirPath +"\n" +
|
||||
used + "\t" + dirPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String[] getExecString() {
|
||||
return new String[] {"du", "-sk", dirPath};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void parseExecResult(BufferedReader lines) throws IOException {
|
||||
String line = lines.readLine();
|
||||
if (line == null) {
|
||||
throw new IOException("Expecting a line not the end of stream");
|
||||
}
|
||||
String[] tokens = line.split("\t");
|
||||
if(tokens.length == 0) {
|
||||
throw new IOException("Illegal du output");
|
||||
}
|
||||
this.used.set(Long.parseLong(tokens[0])*1024);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
String path = ".";
|
||||
|
@ -245,6 +91,10 @@ public class DU extends Shell {
|
|||
path = args[0];
|
||||
}
|
||||
|
||||
System.out.println(new DU(new File(path), new Configuration()).toString());
|
||||
GetSpaceUsed du = new GetSpaceUsed.Builder().setPath(new File(path))
|
||||
.setConf(new Configuration())
|
||||
.build();
|
||||
String duResult = du.toString();
|
||||
System.out.println(duResult);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
public interface GetSpaceUsed {
|
||||
long getUsed() throws IOException;
|
||||
|
||||
/**
|
||||
* The builder class
|
||||
*/
|
||||
final class Builder {
|
||||
static final Logger LOG = LoggerFactory.getLogger(Builder.class);
|
||||
|
||||
static final String CLASSNAME_KEY = "fs.getspaceused.classname";
|
||||
|
||||
private Configuration conf;
|
||||
private Class<? extends GetSpaceUsed> klass = null;
|
||||
private File path = null;
|
||||
private Long interval = null;
|
||||
private Long initialUsed = null;
|
||||
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
public Builder setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getInterval() {
|
||||
if (interval != null) {
|
||||
return interval;
|
||||
}
|
||||
long result = CommonConfigurationKeys.FS_DU_INTERVAL_DEFAULT;
|
||||
if (conf == null) {
|
||||
return result;
|
||||
}
|
||||
return conf.getLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, result);
|
||||
}
|
||||
|
||||
public Builder setInterval(long interval) {
|
||||
this.interval = interval;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Class<? extends GetSpaceUsed> getKlass() {
|
||||
if (klass != null) {
|
||||
return klass;
|
||||
}
|
||||
Class<? extends GetSpaceUsed> result = null;
|
||||
if (Shell.WINDOWS) {
|
||||
result = WindowsGetSpaceUsed.class;
|
||||
} else {
|
||||
result = DU.class;
|
||||
}
|
||||
if (conf == null) {
|
||||
return result;
|
||||
}
|
||||
return conf.getClass(CLASSNAME_KEY, result, GetSpaceUsed.class);
|
||||
}
|
||||
|
||||
public Builder setKlass(Class<? extends GetSpaceUsed> klass) {
|
||||
this.klass = klass;
|
||||
return this;
|
||||
}
|
||||
|
||||
public File getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public Builder setPath(File path) {
|
||||
this.path = path;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getInitialUsed() {
|
||||
if (initialUsed == null) {
|
||||
return -1;
|
||||
}
|
||||
return initialUsed;
|
||||
}
|
||||
|
||||
public Builder setInitialUsed(long initialUsed) {
|
||||
this.initialUsed = initialUsed;
|
||||
return this;
|
||||
}
|
||||
|
||||
public GetSpaceUsed build() throws IOException {
|
||||
GetSpaceUsed getSpaceUsed = null;
|
||||
try {
|
||||
Constructor<? extends GetSpaceUsed> cons =
|
||||
getKlass().getConstructor(Builder.class);
|
||||
getSpaceUsed = cons.newInstance(this);
|
||||
} catch (InstantiationException e) {
|
||||
LOG.warn("Error trying to create an instance of " + getKlass(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.warn("Error trying to create " + getKlass(), e);
|
||||
} catch (InvocationTargetException e) {
|
||||
LOG.warn("Error trying to create " + getKlass(), e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.warn("Doesn't look like the class " + getKlass() +
|
||||
" have the needed constructor", e);
|
||||
}
|
||||
// If there were any exceptions then du will be null.
|
||||
// Construct our best guess fallback.
|
||||
if (getSpaceUsed == null) {
|
||||
if (Shell.WINDOWS) {
|
||||
getSpaceUsed = new WindowsGetSpaceUsed(this);
|
||||
} else {
|
||||
getSpaceUsed = new DU(this);
|
||||
}
|
||||
}
|
||||
// Call init after classes constructors have finished.
|
||||
if (getSpaceUsed instanceof CachingGetSpaceUsed) {
|
||||
((CachingGetSpaceUsed) getSpaceUsed).init();
|
||||
}
|
||||
return getSpaceUsed;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Class to tell the size of a path on windows.
|
||||
* Rather than shelling out, on windows this uses DUHelper.getFolderUsage
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@InterfaceStability.Evolving
|
||||
public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
|
||||
|
||||
|
||||
WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException {
|
||||
super(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to hook in DUHelper class.
|
||||
*/
|
||||
@Override
|
||||
protected void refresh() {
|
||||
used.set(DUHelper.getFolderUsage(getDirPath()));
|
||||
}
|
||||
}
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
/** This test makes sure that "DU" does not get to run on each call to getUsed */
|
||||
/** This test makes sure that "DU" does not get to run on each call to getUsed */
|
||||
public class TestDU extends TestCase {
|
||||
final static private File DU_DIR = GenericTestUtils.getTestDir("dutmp");
|
||||
|
||||
|
@ -42,7 +42,7 @@ public class TestDU extends TestCase {
|
|||
public void tearDown() throws IOException {
|
||||
FileUtil.fullyDelete(DU_DIR);
|
||||
}
|
||||
|
||||
|
||||
private void createFile(File newFile, int size) throws IOException {
|
||||
// write random data so that filesystems with compression enabled (e.g., ZFS)
|
||||
// can't compress the file
|
||||
|
@ -54,18 +54,18 @@ public class TestDU extends TestCase {
|
|||
RandomAccessFile file = new RandomAccessFile(newFile, "rws");
|
||||
|
||||
file.write(data);
|
||||
|
||||
|
||||
file.getFD().sync();
|
||||
file.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that du returns expected used space for a file.
|
||||
* We assume here that if a file system crates a file of size
|
||||
* We assume here that if a file system crates a file of size
|
||||
* that is a multiple of the block size in this file system,
|
||||
* then the used size for the file will be exactly that size.
|
||||
* This is true for most file systems.
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
|
@ -78,28 +78,29 @@ public class TestDU extends TestCase {
|
|||
createFile(file, writtenSize);
|
||||
|
||||
Thread.sleep(5000); // let the metadata updater catch up
|
||||
|
||||
DU du = new DU(file, 10000);
|
||||
du.start();
|
||||
|
||||
DU du = new DU(file, 10000, -1);
|
||||
du.init();
|
||||
long duSize = du.getUsed();
|
||||
du.shutdown();
|
||||
du.close();
|
||||
|
||||
assertTrue("Invalid on-disk size",
|
||||
duSize >= writtenSize &&
|
||||
writtenSize <= (duSize + slack));
|
||||
|
||||
//test with 0 interval, will not launch thread
|
||||
du = new DU(file, 0);
|
||||
du.start();
|
||||
|
||||
//test with 0 interval, will not launch thread
|
||||
du = new DU(file, 0, -1);
|
||||
du.init();
|
||||
duSize = du.getUsed();
|
||||
du.shutdown();
|
||||
|
||||
du.close();
|
||||
|
||||
assertTrue("Invalid on-disk size",
|
||||
duSize >= writtenSize &&
|
||||
writtenSize <= (duSize + slack));
|
||||
|
||||
//test without launching thread
|
||||
du = new DU(file, 10000);
|
||||
|
||||
//test without launching thread
|
||||
du = new DU(file, 10000, -1);
|
||||
du.init();
|
||||
duSize = du.getUsed();
|
||||
|
||||
assertTrue("Invalid on-disk size",
|
||||
|
@ -111,8 +112,8 @@ public class TestDU extends TestCase {
|
|||
assertTrue(file.createNewFile());
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, 10000L);
|
||||
DU du = new DU(file, conf);
|
||||
du.decDfsUsed(Long.MAX_VALUE);
|
||||
DU du = new DU(file, 10000L, -1);
|
||||
du.incDfsUsed(-Long.MAX_VALUE);
|
||||
long duSize = du.getUsed();
|
||||
assertTrue(String.valueOf(duSize), duSize >= 0L);
|
||||
}
|
||||
|
@ -121,7 +122,7 @@ public class TestDU extends TestCase {
|
|||
File file = new File(DU_DIR, "dataX");
|
||||
createFile(file, 8192);
|
||||
DU du = new DU(file, 3000, 1024);
|
||||
du.start();
|
||||
du.init();
|
||||
assertTrue("Initial usage setting not honored", du.getUsed() == 1024);
|
||||
|
||||
// wait until the first du runs.
|
||||
|
@ -131,4 +132,7 @@ public class TestDU extends TestCase {
|
|||
|
||||
assertTrue("Usage didn't get updated", du.getUsed() == 8192);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,133 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestGetSpaceUsed {
|
||||
final static private File DIR = new File(
|
||||
System.getProperty("test.build.data", "/tmp"), "TestGetSpaceUsed");
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
FileUtil.fullyDelete(DIR);
|
||||
assertTrue(DIR.mkdirs());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
FileUtil.fullyDelete(DIR);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the builder can create a class specified through the class.
|
||||
*/
|
||||
@Test
|
||||
public void testBuilderConf() throws Exception {
|
||||
File file = new File(DIR, "testBuilderConf");
|
||||
assertTrue(file.createNewFile());
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.getspaceused.classname", DummyDU.class.getName());
|
||||
CachingGetSpaceUsed instance =
|
||||
(CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
|
||||
.setPath(file)
|
||||
.setInterval(0)
|
||||
.setConf(conf)
|
||||
.build();
|
||||
assertNotNull(instance);
|
||||
assertTrue(instance instanceof DummyDU);
|
||||
assertFalse(instance.running());
|
||||
instance.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildInitial() throws Exception {
|
||||
File file = new File(DIR, "testBuildInitial");
|
||||
assertTrue(file.createNewFile());
|
||||
CachingGetSpaceUsed instance =
|
||||
(CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
|
||||
.setPath(file)
|
||||
.setInitialUsed(90210)
|
||||
.setKlass(DummyDU.class)
|
||||
.build();
|
||||
assertEquals(90210, instance.getUsed());
|
||||
instance.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildInterval() throws Exception {
|
||||
File file = new File(DIR, "testBuildInitial");
|
||||
assertTrue(file.createNewFile());
|
||||
CachingGetSpaceUsed instance =
|
||||
(CachingGetSpaceUsed) new CachingGetSpaceUsed.Builder()
|
||||
.setPath(file)
|
||||
.setInitialUsed(90210)
|
||||
.setInterval(50060)
|
||||
.setKlass(DummyDU.class)
|
||||
.build();
|
||||
assertEquals(50060, instance.getRefreshInterval());
|
||||
instance.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildNonCaching() throws Exception {
|
||||
File file = new File(DIR, "testBuildNonCaching");
|
||||
assertTrue(file.createNewFile());
|
||||
GetSpaceUsed instance = new CachingGetSpaceUsed.Builder()
|
||||
.setPath(file)
|
||||
.setInitialUsed(90210)
|
||||
.setInterval(50060)
|
||||
.setKlass(DummyGetSpaceUsed.class)
|
||||
.build();
|
||||
assertEquals(300, instance.getUsed());
|
||||
assertTrue(instance instanceof DummyGetSpaceUsed);
|
||||
}
|
||||
|
||||
private static class DummyDU extends CachingGetSpaceUsed {
|
||||
|
||||
public DummyDU(Builder builder) throws IOException {
|
||||
// Push to the base class.
|
||||
// Most times that's all that will need to be done.
|
||||
super(builder);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void refresh() {
|
||||
// This is a test so don't du anything.
|
||||
}
|
||||
}
|
||||
|
||||
private static class DummyGetSpaceUsed implements GetSpaceUsed {
|
||||
|
||||
public DummyGetSpaceUsed(GetSpaceUsed.Builder builder) {
|
||||
|
||||
}
|
||||
|
||||
@Override public long getUsed() throws IOException {
|
||||
return 300;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -36,8 +36,9 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DU;
|
||||
import org.apache.hadoop.fs.CachingGetSpaceUsed;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.GetSpaceUsed;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -62,10 +63,10 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.common.io.Files;
|
||||
|
||||
/**
|
||||
* A block pool slice represents a portion of a block pool stored on a volume.
|
||||
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
||||
* A block pool slice represents a portion of a block pool stored on a volume.
|
||||
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
||||
* cluster represent a single block pool.
|
||||
*
|
||||
*
|
||||
* This class is synchronized by {@link FsVolumeImpl}.
|
||||
*/
|
||||
class BlockPoolSlice {
|
||||
|
@ -92,10 +93,10 @@ class BlockPoolSlice {
|
|||
private final Timer timer;
|
||||
|
||||
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
||||
private final DU dfsUsage;
|
||||
private final GetSpaceUsed dfsUsage;
|
||||
|
||||
/**
|
||||
* Create a blook pool slice
|
||||
* Create a blook pool slice
|
||||
* @param bpid Block pool Id
|
||||
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
|
||||
* @param bpDir directory corresponding to the BlockPool
|
||||
|
@ -107,7 +108,7 @@ class BlockPoolSlice {
|
|||
Configuration conf, Timer timer) throws IOException {
|
||||
this.bpid = bpid;
|
||||
this.volume = volume;
|
||||
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
this.finalizedDir = new File(
|
||||
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
|
||||
this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
|
||||
|
@ -151,8 +152,10 @@ class BlockPoolSlice {
|
|||
}
|
||||
// Use cached value initially if available. Or the following call will
|
||||
// block until the initial du command completes.
|
||||
this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
|
||||
this.dfsUsage.start();
|
||||
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
|
||||
.setConf(conf)
|
||||
.setInitialUsed(loadDfsUsed())
|
||||
.build();
|
||||
|
||||
// Make the dfs usage to be saved during shutdown.
|
||||
ShutdownHookManager.get().addShutdownHook(
|
||||
|
@ -173,7 +176,7 @@ class BlockPoolSlice {
|
|||
File getFinalizedDir() {
|
||||
return finalizedDir;
|
||||
}
|
||||
|
||||
|
||||
File getLazypersistDir() {
|
||||
return lazypersistDir;
|
||||
}
|
||||
|
@ -188,17 +191,21 @@ class BlockPoolSlice {
|
|||
|
||||
/** Run DU on local drives. It must be synchronized from caller. */
|
||||
void decDfsUsed(long value) {
|
||||
dfsUsage.decDfsUsed(value);
|
||||
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||
((CachingGetSpaceUsed)dfsUsage).incDfsUsed(-value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
return dfsUsage.getUsed();
|
||||
}
|
||||
|
||||
void incDfsUsed(long value) {
|
||||
dfsUsage.incDfsUsed(value);
|
||||
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||
((CachingGetSpaceUsed)dfsUsage).incDfsUsed(value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Read in the cached DU value and return it if it is less than
|
||||
* cachedDfsUsedCheckTime which is set by
|
||||
|
@ -304,7 +311,10 @@ class BlockPoolSlice {
|
|||
}
|
||||
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
|
||||
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
|
||||
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
|
||||
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||
((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
|
||||
b.getNumBytes() + metaFile.length());
|
||||
}
|
||||
return blockFile;
|
||||
}
|
||||
|
||||
|
@ -331,7 +341,7 @@ class BlockPoolSlice {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
void getVolumeMap(ReplicaMap volumeMap,
|
||||
final RamDiskReplicaTracker lazyWriteReplicaMap)
|
||||
throws IOException {
|
||||
|
@ -342,7 +352,7 @@ class BlockPoolSlice {
|
|||
FsDatasetImpl.LOG.info(
|
||||
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
|
||||
}
|
||||
|
||||
|
||||
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
|
||||
if (!success) {
|
||||
// add finalized replicas
|
||||
|
@ -436,7 +446,7 @@ class BlockPoolSlice {
|
|||
FileUtil.fullyDelete(source);
|
||||
return numRecovered;
|
||||
}
|
||||
|
||||
|
||||
private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
|
||||
final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
|
||||
throws IOException {
|
||||
|
@ -444,7 +454,7 @@ class BlockPoolSlice {
|
|||
long blockId = block.getBlockId();
|
||||
long genStamp = block.getGenerationStamp();
|
||||
if (isFinalized) {
|
||||
newReplica = new FinalizedReplica(blockId,
|
||||
newReplica = new FinalizedReplica(blockId,
|
||||
block.getNumBytes(), genStamp, volume, DatanodeUtil
|
||||
.idToBlockDir(finalizedDir, blockId));
|
||||
} else {
|
||||
|
@ -461,7 +471,7 @@ class BlockPoolSlice {
|
|||
// We don't know the expected block length, so just use 0
|
||||
// and don't reserve any more space for writes.
|
||||
newReplica = new ReplicaBeingWritten(blockId,
|
||||
validateIntegrityAndSetLength(file, genStamp),
|
||||
validateIntegrityAndSetLength(file, genStamp),
|
||||
genStamp, volume, file.getParentFile(), null, 0);
|
||||
loadRwr = false;
|
||||
}
|
||||
|
@ -507,7 +517,7 @@ class BlockPoolSlice {
|
|||
incrNumBlocks();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Add replicas under the given directory to the volume map
|
||||
|
@ -537,12 +547,12 @@ class BlockPoolSlice {
|
|||
}
|
||||
if (!Block.isBlockFilename(file))
|
||||
continue;
|
||||
|
||||
|
||||
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
|
||||
files, file);
|
||||
long blockId = Block.filename2id(file.getName());
|
||||
Block block = new Block(blockId, file.length(), genStamp);
|
||||
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
|
||||
Block block = new Block(blockId, file.length(), genStamp);
|
||||
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
|
||||
isFinalized);
|
||||
}
|
||||
}
|
||||
|
@ -636,11 +646,11 @@ class BlockPoolSlice {
|
|||
|
||||
/**
|
||||
* Find out the number of bytes in the block that match its crc.
|
||||
*
|
||||
* This algorithm assumes that data corruption caused by unexpected
|
||||
*
|
||||
* This algorithm assumes that data corruption caused by unexpected
|
||||
* datanode shutdown occurs only in the last crc chunk. So it checks
|
||||
* only the last chunk.
|
||||
*
|
||||
*
|
||||
* @param blockFile the block file
|
||||
* @param genStamp generation stamp of the block
|
||||
* @return the number of valid bytes
|
||||
|
@ -667,7 +677,7 @@ class BlockPoolSlice {
|
|||
int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
int checksumSize = checksum.getChecksumSize();
|
||||
long numChunks = Math.min(
|
||||
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
|
||||
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
|
||||
(metaFileLen - crcHeaderLen)/checksumSize);
|
||||
if (numChunks == 0) {
|
||||
return 0;
|
||||
|
@ -710,17 +720,20 @@ class BlockPoolSlice {
|
|||
IOUtils.closeStream(blockIn);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return currentDir.getAbsolutePath();
|
||||
}
|
||||
|
||||
|
||||
void shutdown(BlockListAsLongs blocksListToPersist) {
|
||||
saveReplicas(blocksListToPersist);
|
||||
saveDfsUsed();
|
||||
dfsUsedSaved = true;
|
||||
dfsUsage.shutdown();
|
||||
|
||||
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||
IOUtils.cleanup(LOG, ((CachingGetSpaceUsed) dfsUsage));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean readReplicasFromCache(ReplicaMap volumeMap,
|
||||
|
@ -729,17 +742,17 @@ class BlockPoolSlice {
|
|||
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
|
||||
// Check whether the file exists or not.
|
||||
if (!replicaFile.exists()) {
|
||||
LOG.info("Replica Cache file: "+ replicaFile.getPath() +
|
||||
LOG.info("Replica Cache file: "+ replicaFile.getPath() +
|
||||
" doesn't exist ");
|
||||
return false;
|
||||
}
|
||||
long fileLastModifiedTime = replicaFile.lastModified();
|
||||
if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
|
||||
LOG.info("Replica Cache file: " + replicaFile.getPath() +
|
||||
LOG.info("Replica Cache file: " + replicaFile.getPath() +
|
||||
" has gone stale");
|
||||
// Just to make findbugs happy
|
||||
if (!replicaFile.delete()) {
|
||||
LOG.info("Replica Cache file: " + replicaFile.getPath() +
|
||||
LOG.info("Replica Cache file: " + replicaFile.getPath() +
|
||||
" cannot be deleted");
|
||||
}
|
||||
return false;
|
||||
|
@ -776,7 +789,7 @@ class BlockPoolSlice {
|
|||
iter.remove();
|
||||
volumeMap.add(bpid, info);
|
||||
}
|
||||
LOG.info("Successfully read replica from cache file : "
|
||||
LOG.info("Successfully read replica from cache file : "
|
||||
+ replicaFile.getPath());
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
|
@ -794,10 +807,10 @@ class BlockPoolSlice {
|
|||
// close the inputStream
|
||||
IOUtils.closeStream(inputStream);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void saveReplicas(BlockListAsLongs blocksListToPersist) {
|
||||
if (blocksListToPersist == null ||
|
||||
if (blocksListToPersist == null ||
|
||||
blocksListToPersist.getNumberOfBlocks()== 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -813,7 +826,7 @@ class BlockPoolSlice {
|
|||
replicaCacheFile.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
FileOutputStream out = null;
|
||||
try {
|
||||
out = new FileOutputStream(tmpFile);
|
||||
|
@ -827,7 +840,7 @@ class BlockPoolSlice {
|
|||
// and continue.
|
||||
LOG.warn("Failed to write replicas to cache ", e);
|
||||
if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
|
||||
LOG.warn("Failed to delete replicas file: " +
|
||||
LOG.warn("Failed to delete replicas file: " +
|
||||
replicaCacheFile.getPath());
|
||||
}
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue