HDFS-9624. DataNode start slowly due to the initial DU command operations. (Lin Yiqun via wang)

(cherry picked from commit c07f7fa8ff)
This commit is contained in:
Andrew Wang 2016-01-15 11:28:46 -08:00
parent ee012c1f6f
commit 46323d7744
7 changed files with 161 additions and 13 deletions

View File

@ -39,6 +39,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9350. Avoid creating temprorary strings in Block.toString() and HDFS-9350. Avoid creating temprorary strings in Block.toString() and
getBlockName() (Staffan Friberg via cmccabe) getBlockName() (Staffan Friberg via cmccabe)
HDFS-9624. DataNode start slowly due to the initial DU command operations.
(Lin Yiqun via wang)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -120,6 +120,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true; public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true;
public static final String DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS =
"dfs.datanode.cached-dfsused.check.interval.ms";
public static final long DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS =
600000;
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT = public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent"; "dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;

View File

@ -55,7 +55,7 @@
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -79,12 +79,15 @@ class BlockPoolSlice {
private final File rbwDir; // directory store RBW replica private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica private final File tmpDir; // directory store Temporary replica
private final int ioFileBufferSize; private final int ioFileBufferSize;
private static final String DU_CACHE_FILE = "dfsUsed"; @VisibleForTesting
public static final String DU_CACHE_FILE = "dfsUsed";
private volatile boolean dfsUsedSaved = false; private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final boolean deleteDuplicateReplicas; private final boolean deleteDuplicateReplicas;
private static final String REPLICA_CACHE_FILE = "replicas"; private static final String REPLICA_CACHE_FILE = "replicas";
private final long replicaCacheExpiry = 5*60*1000; private final long replicaCacheExpiry = 5*60*1000;
private final long cachedDfsUsedCheckTime;
private final Timer timer;
// TODO:FEDERATION scalability issue - a thread per DU is needed // TODO:FEDERATION scalability issue - a thread per DU is needed
private final DU dfsUsage; private final DU dfsUsage;
@ -95,10 +98,11 @@ class BlockPoolSlice {
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
* @param bpDir directory corresponding to the BlockPool * @param bpDir directory corresponding to the BlockPool
* @param conf configuration * @param conf configuration
* @param timer include methods for getting time
* @throws IOException * @throws IOException
*/ */
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir, BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
Configuration conf) throws IOException { Configuration conf, Timer timer) throws IOException {
this.bpid = bpid; this.bpid = bpid;
this.volume = volume; this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
@ -117,6 +121,12 @@ class BlockPoolSlice {
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT); DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
this.cachedDfsUsedCheckTime =
conf.getLong(
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
this.timer = timer;
// Files that were being written when the datanode was last shutdown // Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that // are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local // in the future, we might want to do some sort of datanode-local
@ -193,11 +203,13 @@ void incDfsUsed(long value) {
dfsUsage.incDfsUsed(value); dfsUsage.incDfsUsed(value);
} }
/** /**
* Read in the cached DU value and return it if it is less than 600 seconds * Read in the cached DU value and return it if it is less than
* old (DU update interval). Slight imprecision of dfsUsed is not critical * cachedDfsUsedCheckTime which is set by
* and skipping DU can significantly shorten the startup time. * dfs.datanode.cached-dfsused.check.interval.ms parameter. Slight imprecision
* If the cached value is not available or too old, -1 is returned. * of dfsUsed is not critical and skipping DU can significantly shorten the
* startup time. If the cached value is not available or too old, -1 is
* returned.
*/ */
long loadDfsUsed() { long loadDfsUsed() {
long cachedDfsUsed; long cachedDfsUsed;
@ -225,7 +237,7 @@ long loadDfsUsed() {
} }
// Return the cached value if mtime is okay. // Return the cached value if mtime is okay.
if (mtime > 0 && (Time.now() - mtime < 600000L)) { if (mtime > 0 && (timer.now() - mtime < cachedDfsUsedCheckTime)) {
FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " + FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
cachedDfsUsed); cachedDfsUsed);
return cachedDfsUsed; return cachedDfsUsed;
@ -251,7 +263,7 @@ void saveDfsUsed() {
try (Writer out = new OutputStreamWriter( try (Writer out = new OutputStreamWriter(
new FileOutputStream(outFile), "UTF-8")) { new FileOutputStream(outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid. // mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(Time.now())); out.write(Long.toString(used) + " " + Long.toString(timer.now()));
out.flush(); out.flush();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
@ -440,7 +452,7 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
try { try {
sc = new Scanner(restartMeta, "UTF-8"); sc = new Scanner(restartMeta, "UTF-8");
// The restart meta file exists // The restart meta file exists
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { if (sc.hasNextLong() && (sc.nextLong() > timer.now())) {
// It didn't expire. Load the replica as a RBW. // It didn't expire. Load the replica as a RBW.
// We don't know the expected block length, so just use 0 // We don't know the expected block length, so just use 0
// and don't reserve any more space for writes. // and don't reserve any more space for writes.

View File

@ -118,6 +118,7 @@
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -132,6 +133,7 @@
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static final Log LOG = LogFactory.getLog(FsDatasetImpl.class); static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
private final static boolean isNativeIOAvailable; private final static boolean isNativeIOAvailable;
private Timer timer;
static { static {
isNativeIOAvailable = NativeIO.isAvailable(); isNativeIOAvailable = NativeIO.isAvailable();
if (Path.WINDOWS && !isNativeIOAvailable) { if (Path.WINDOWS && !isNativeIOAvailable) {
@ -438,7 +440,7 @@ public void addVolume(final StorageLocation location,
for (final NamespaceInfo nsInfo : nsInfos) { for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID(); String bpid = nsInfo.getBlockPoolID();
try { try {
fsVolume.addBlockPool(bpid, this.conf); fsVolume.addBlockPool(bpid, this.conf, this.timer);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume + LOG.warn("Caught exception when adding " + fsVolume +
@ -3127,5 +3129,10 @@ boolean reserveLockedMemory(long bytesNeeded) {
evictLazyPersistBlocks(bytesNeeded); evictLazyPersistBlocks(bytesNeeded);
return cacheManager.reserve(bytesNeeded) > 0; return cacheManager.reserve(bytesNeeded) > 0;
} }
@VisibleForTesting
public void setTimer(Timer newTimer) {
this.timer = newTimer;
}
} }

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -858,8 +859,18 @@ void shutdown() {
} }
void addBlockPool(String bpid, Configuration conf) throws IOException { void addBlockPool(String bpid, Configuration conf) throws IOException {
addBlockPool(bpid, conf, null);
}
void addBlockPool(String bpid, Configuration conf, Timer timer)
throws IOException {
File bpdir = new File(currentDir, bpid); File bpdir = new File(currentDir, bpid);
BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf); BlockPoolSlice bp;
if (timer == null) {
bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer());
} else {
bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer);
}
bpSlices.put(bpid, bp); bpSlices.put(bpid, bp);
} }

View File

@ -2721,4 +2721,17 @@
reduces initial request failures after datanode restart. reduces initial request failures after datanode restart.
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.cached-dfsused.check.interval.ms</name>
<value>600000</value>
<description>
The interval check time of loading DU_CACHE_FILE in each volume.
When the cluster doing the rolling upgrade operations, it will
usually lead dfsUsed cache file of each volume expired and redo the
du operations in datanode and that makes datanode start slowly. Adjust
this property can make cache file be available for the time as you want.
</description>
</property>
</configuration> </configuration>

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -57,13 +58,18 @@
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -444,4 +450,95 @@ public void testDuplicateReplicaResolution() throws IOException {
assertSame(replica, assertSame(replica,
BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
} }
@Test
public void testLoadingDfsUsedForVolumes() throws IOException,
InterruptedException {
long waitIntervalTime = 5000;
// Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime
// to avoid cache-dfsused time expired
long cachedDfsUsedIntervalTime = waitIntervalTime + 1000;
conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
cachedDfsUsedIntervalTime);
long cacheDfsUsed = 1024;
long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
assertEquals(cacheDfsUsed, dfsUsed);
}
@Test
public void testLoadingDfsUsedForVolumesExpired() throws IOException,
InterruptedException {
long waitIntervalTime = 5000;
// Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime
// to make cache-dfsused time expired
long cachedDfsUsedIntervalTime = waitIntervalTime - 1000;
conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
cachedDfsUsedIntervalTime);
long cacheDfsUsed = 1024;
long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
// Because the cache-dfsused expired and the dfsUsed will be recalculated
assertTrue(cacheDfsUsed != dfsUsed);
}
private long getDfsUsedValueOfNewVolume(long cacheDfsUsed,
long waitIntervalTime) throws IOException, InterruptedException {
List<NamespaceInfo> nsInfos = Lists.newArrayList();
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1));
String CURRENT_DIR = "current";
String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE;
String path = BASE_DIR + "/newData0";
String pathUri = new Path(path).toUri().toString();
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path));
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(
storage.prepareVolume(eq(datanode), eq(loc.getFile()),
anyListOf(NamespaceInfo.class))).thenReturn(builder);
String cacheFilePath =
String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0],
CURRENT_DIR, DU_CACHE_FILE);
File outFile = new File(cacheFilePath);
if (!outFile.getParentFile().exists()) {
outFile.getParentFile().mkdirs();
}
if (outFile.exists()) {
outFile.delete();
}
FakeTimer timer = new FakeTimer();
try {
try (Writer out =
new OutputStreamWriter(new FileOutputStream(outFile),
StandardCharsets.UTF_8)) {
// Write the dfsUsed value and the time to cache file
out.write(Long.toString(cacheDfsUsed) + " "
+ Long.toString(timer.now()));
out.flush();
}
} catch (IOException ioe) {
}
dataset.setTimer(timer);
timer.advance(waitIntervalTime);
dataset.addVolume(loc, nsInfos);
// Get the last volume which was just added before
FsVolumeImpl newVolume;
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1);
}
long dfsUsed = newVolume.getDfsUsed();
return dfsUsed;
}
} }