HDFS-5498. Improve datanode startup time. Contributed by Kihwal Lee.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
57b28693ee
commit
fa6e59891c
|
@ -95,3 +95,5 @@ HDFS-5535 subtasks:
|
|||
#testRaceBetweenReplicaRecoveryAndFinalizeBlock. (kihwal)
|
||||
|
||||
HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal)
|
||||
|
||||
HDFS-5498. Improve datanode startup time. (kihwal)
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.DataInputStream;
|
|||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.io.IOUtils;
|
|||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
/**
|
||||
|
@ -60,6 +62,9 @@ class BlockPoolSlice {
|
|||
private final LDir finalizedDir; // directory store Finalized replica
|
||||
private final File rbwDir; // directory store RBW replica
|
||||
private final File tmpDir; // directory store Temporary replica
|
||||
private static String DU_CACHE_FILE = "dfsUsed";
|
||||
private volatile boolean dfsUsedSaved = false;
|
||||
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||
|
||||
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
||||
private final DU dfsUsage;
|
||||
|
@ -110,8 +115,21 @@ class BlockPoolSlice {
|
|||
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
|
||||
}
|
||||
}
|
||||
this.dfsUsage = new DU(bpDir, conf);
|
||||
// Use cached value initially if available. Or the following call will
|
||||
// block until the initial du command completes.
|
||||
this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
|
||||
this.dfsUsage.start();
|
||||
|
||||
// Make the dfs usage to be saved during shutdown.
|
||||
ShutdownHookManager.get().addShutdownHook(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!dfsUsedSaved) {
|
||||
saveDfsUsed();
|
||||
}
|
||||
}
|
||||
}, SHUTDOWN_HOOK_PRIORITY);
|
||||
}
|
||||
|
||||
File getDirectory() {
|
||||
|
@ -135,6 +153,74 @@ class BlockPoolSlice {
|
|||
return dfsUsage.getUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read in the cached DU value and return it if it is less than 600 seconds
|
||||
* old (DU update interval). Slight imprecision of dfsUsed is not critical
|
||||
* and skipping DU can significantly shorten the startup time.
|
||||
* If the cached value is not available or too old, -1 is returned.
|
||||
*/
|
||||
long loadDfsUsed() {
|
||||
long cachedDfsUsed;
|
||||
long mtime;
|
||||
Scanner sc;
|
||||
|
||||
try {
|
||||
sc = new Scanner(new File(currentDir, DU_CACHE_FILE));
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
try {
|
||||
// Get the recorded dfsUsed from the file.
|
||||
if (sc.hasNextLong()) {
|
||||
cachedDfsUsed = sc.nextLong();
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
// Get the recorded mtime from the file.
|
||||
if (sc.hasNextLong()) {
|
||||
mtime = sc.nextLong();
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Return the cached value if mtime is okay.
|
||||
if (mtime > 0 && (Time.now() - mtime < 600000L)) {
|
||||
FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
|
||||
cachedDfsUsed);
|
||||
return cachedDfsUsed;
|
||||
}
|
||||
return -1;
|
||||
} finally {
|
||||
sc.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the current dfsUsed to the cache file.
|
||||
*/
|
||||
void saveDfsUsed() {
|
||||
File outFile = new File(currentDir, DU_CACHE_FILE);
|
||||
if (outFile.exists()) {
|
||||
outFile.delete();
|
||||
}
|
||||
|
||||
try {
|
||||
long used = getDfsUsed();
|
||||
if (used > 0) {
|
||||
FileWriter out = new FileWriter(outFile);
|
||||
// mtime is written last, so that truncated writes won't be valid.
|
||||
out.write(Long.toString(used) + " " + Long.toString(Time.now()));
|
||||
out.flush();
|
||||
out.close();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// If write failed, the volume might be bad. Since the cache file is
|
||||
// not critical, log the error and continue.
|
||||
FsDatasetImpl.LOG.warn("Failed to write dfsUsed to " + outFile, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporary files. They get moved to the finalized block directory when
|
||||
* the block is finalized.
|
||||
|
@ -210,6 +296,7 @@ class BlockPoolSlice {
|
|||
genStamp, volume, blockFile.getParentFile(), null);
|
||||
loadRwr = false;
|
||||
}
|
||||
sc.close();
|
||||
restartMeta.delete();
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// nothing to do here
|
||||
|
@ -326,6 +413,8 @@ class BlockPoolSlice {
|
|||
}
|
||||
|
||||
void shutdown() {
|
||||
saveDfsUsed();
|
||||
dfsUsedSaved = true;
|
||||
dfsUsage.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1727,11 +1727,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addBlockPool(String bpid, Configuration conf)
|
||||
public void addBlockPool(String bpid, Configuration conf)
|
||||
throws IOException {
|
||||
LOG.info("Adding block pool " + bpid);
|
||||
volumes.addBlockPool(bpid, conf);
|
||||
volumeMap.initBlockPool(bpid);
|
||||
synchronized(this) {
|
||||
volumes.addBlockPool(bpid, conf);
|
||||
volumeMap.initBlockPool(bpid);
|
||||
}
|
||||
volumes.getAllVolumesMap(bpid, volumeMap);
|
||||
}
|
||||
|
||||
|
|
|
@ -96,10 +96,41 @@ class FsVolumeList {
|
|||
}
|
||||
}
|
||||
|
||||
void getAllVolumesMap(String bpid, ReplicaMap volumeMap) throws IOException {
|
||||
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
|
||||
long totalStartTime = System.currentTimeMillis();
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
getVolumeMap(bpid, v, volumeMap);
|
||||
final List<IOException> exceptions = Collections.synchronizedList(
|
||||
new ArrayList<IOException>());
|
||||
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
|
||||
for (final FsVolumeImpl v : volumes) {
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
|
||||
bpid + " on volume " + v + "...");
|
||||
long startTime = System.currentTimeMillis();
|
||||
v.getVolumeMap(bpid, volumeMap);
|
||||
long timeTaken = System.currentTimeMillis() - startTime;
|
||||
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
|
||||
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
|
||||
} catch (IOException ioe) {
|
||||
FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
|
||||
"from " + v + ". Will throw later.", ioe);
|
||||
exceptions.add(ioe);
|
||||
}
|
||||
}
|
||||
};
|
||||
replicaAddingThreads.add(t);
|
||||
t.start();
|
||||
}
|
||||
for (Thread t : replicaAddingThreads) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException ie) {
|
||||
throw new IOException(ie);
|
||||
}
|
||||
}
|
||||
if (!exceptions.isEmpty()) {
|
||||
throw exceptions.get(0);
|
||||
}
|
||||
long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
|
||||
FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
|
||||
|
@ -219,4 +250,4 @@ class FsVolumeList {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -290,10 +290,14 @@ public class UpgradeUtilities {
|
|||
if (!list[i].isFile()) {
|
||||
continue;
|
||||
}
|
||||
// skip VERSION file for DataNodes
|
||||
if (nodeType == DATA_NODE && list[i].getName().equals("VERSION")) {
|
||||
|
||||
// skip VERSION and dfsUsed file for DataNodes
|
||||
if (nodeType == DATA_NODE &&
|
||||
(list[i].getName().equals("VERSION") ||
|
||||
list[i].getName().equals("dfsUsed"))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
FileInputStream fis = null;
|
||||
try {
|
||||
fis = new FileInputStream(list[i]);
|
||||
|
|
Loading…
Reference in New Issue