diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 17229dc3e76..c8607cb8f19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -39,6 +39,9 @@ Release 2.9.0 - UNRELEASED HDFS-9350. Avoid creating temprorary strings in Block.toString() and getBlockName() (Staffan Friberg via cmccabe) + HDFS-9624. DataNode start slowly due to the initial DU command operations. + (Lin Yiqun via wang) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7450730798f..718cb5808d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -120,6 +120,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true; + public static final String DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS = + "dfs.datanode.cached-dfsused.check.interval.ms"; + public static final long DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS = + 600000; + public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT = "dfs.namenode.path.based.cache.block.map.allocation.percent"; public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 33a88df1fad..69e0913248c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -55,7 +55,7 @@ import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; @@ -79,12 +79,15 @@ class BlockPoolSlice { private final File rbwDir; // directory store RBW replica private final File tmpDir; // directory store Temporary replica private final int ioFileBufferSize; - private static final String DU_CACHE_FILE = "dfsUsed"; + @VisibleForTesting + public static final String DU_CACHE_FILE = "dfsUsed"; private volatile boolean dfsUsedSaved = false; private static final int SHUTDOWN_HOOK_PRIORITY = 30; private final boolean deleteDuplicateReplicas; private static final String REPLICA_CACHE_FILE = "replicas"; private final long replicaCacheExpiry = 5*60*1000; + private final long cachedDfsUsedCheckTime; + private final Timer timer; // TODO:FEDERATION scalability issue - a thread per DU is needed private final DU dfsUsage; @@ -95,10 +98,11 @@ class BlockPoolSlice { * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to * @param bpDir directory corresponding to the BlockPool * @param conf configuration + * @param timer include methods for getting time * @throws IOException */ BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir, - Configuration conf) throws IOException { + Configuration conf, Timer timer) throws IOException { this.bpid = bpid; this.volume = volume; this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); @@ -117,6 +121,12 @@ class BlockPoolSlice { DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT); + this.cachedDfsUsedCheckTime = + conf.getLong( + DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, + DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS); + this.timer = timer; + // Files that were being written when the datanode was last shutdown // are now moved back to the data directory. It is possible that // in the future, we might want to do some sort of datanode-local @@ -193,11 +203,13 @@ void incDfsUsed(long value) { dfsUsage.incDfsUsed(value); } - /** - * Read in the cached DU value and return it if it is less than 600 seconds - * old (DU update interval). Slight imprecision of dfsUsed is not critical - * and skipping DU can significantly shorten the startup time. - * If the cached value is not available or too old, -1 is returned. + /** + * Read in the cached DU value and return it if it is less than + * cachedDfsUsedCheckTime which is set by + * dfs.datanode.cached-dfsused.check.interval.ms parameter. Slight imprecision + * of dfsUsed is not critical and skipping DU can significantly shorten the + * startup time. If the cached value is not available or too old, -1 is + * returned. */ long loadDfsUsed() { long cachedDfsUsed; @@ -225,7 +237,7 @@ long loadDfsUsed() { } // Return the cached value if mtime is okay. - if (mtime > 0 && (Time.now() - mtime < 600000L)) { + if (mtime > 0 && (timer.now() - mtime < cachedDfsUsedCheckTime)) { FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " + cachedDfsUsed); return cachedDfsUsed; @@ -251,7 +263,7 @@ void saveDfsUsed() { try (Writer out = new OutputStreamWriter( new FileOutputStream(outFile), "UTF-8")) { // mtime is written last, so that truncated writes won't be valid. - out.write(Long.toString(used) + " " + Long.toString(Time.now())); + out.write(Long.toString(used) + " " + Long.toString(timer.now())); out.flush(); } } catch (IOException ioe) { @@ -440,7 +452,7 @@ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, try { sc = new Scanner(restartMeta, "UTF-8"); // The restart meta file exists - if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { + if (sc.hasNextLong() && (sc.nextLong() > timer.now())) { // It didn't expire. Load the replica as a RBW. // We don't know the expected block length, so just use 0 // and don't reserve any more space for writes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 544169d9084..dd206c72f1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -118,6 +118,7 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Timer; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -132,6 +133,7 @@ class FsDatasetImpl implements FsDatasetSpi { static final Log LOG = LogFactory.getLog(FsDatasetImpl.class); private final static boolean isNativeIOAvailable; + private Timer timer; static { isNativeIOAvailable = NativeIO.isAvailable(); if (Path.WINDOWS && !isNativeIOAvailable) { @@ -438,7 +440,7 @@ public void addVolume(final StorageLocation location, for (final NamespaceInfo nsInfo : nsInfos) { String bpid = nsInfo.getBlockPoolID(); try { - fsVolume.addBlockPool(bpid, this.conf); + fsVolume.addBlockPool(bpid, this.conf, this.timer); fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker); } catch (IOException e) { LOG.warn("Caught exception when adding " + fsVolume + @@ -3127,5 +3129,10 @@ boolean reserveLockedMemory(long bytesNeeded) { evictLazyPersistBlocks(bytesNeeded); return cacheManager.reserve(bytesNeeded) > 0; } + + @VisibleForTesting + public void setTimer(Timer newTimer) { + this.timer = newTimer; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 126f6672758..425809f1041 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Timer; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -858,8 +859,18 @@ void shutdown() { } void addBlockPool(String bpid, Configuration conf) throws IOException { + addBlockPool(bpid, conf, null); + } + + void addBlockPool(String bpid, Configuration conf, Timer timer) + throws IOException { File bpdir = new File(currentDir, bpid); - BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf); + BlockPoolSlice bp; + if (timer == null) { + bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer()); + } else { + bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer); + } bpSlices.put(bpid, bp); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 30b7b538e62..8b3c363d0a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2721,4 +2721,17 @@ reduces initial request failures after datanode restart. + + + dfs.datanode.cached-dfsused.check.interval.ms + 600000 + + The interval check time of loading DU_CACHE_FILE in each volume. + When the cluster doing the rolling upgrade operations, it will + usually lead dfsUsed cache file of each volume expired and redo the + du operations in datanode and that makes datanode start slowly. Adjust + this property can make cache file be available for the time as you want. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index a3d57691829..cdc1d6183b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -57,13 +58,18 @@ import org.mockito.stubbing.Answer; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -444,4 +450,95 @@ public void testDuplicateReplicaResolution() throws IOException { assertSame(replica, BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); } + + @Test + public void testLoadingDfsUsedForVolumes() throws IOException, + InterruptedException { + long waitIntervalTime = 5000; + // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime + // to avoid cache-dfsused time expired + long cachedDfsUsedIntervalTime = waitIntervalTime + 1000; + conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, + cachedDfsUsedIntervalTime); + + long cacheDfsUsed = 1024; + long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); + + assertEquals(cacheDfsUsed, dfsUsed); + } + + @Test + public void testLoadingDfsUsedForVolumesExpired() throws IOException, + InterruptedException { + long waitIntervalTime = 5000; + // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime + // to make cache-dfsused time expired + long cachedDfsUsedIntervalTime = waitIntervalTime - 1000; + conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, + cachedDfsUsedIntervalTime); + + long cacheDfsUsed = 1024; + long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); + + // Because the cache-dfsused expired and the dfsUsed will be recalculated + assertTrue(cacheDfsUsed != dfsUsed); + } + + private long getDfsUsedValueOfNewVolume(long cacheDfsUsed, + long waitIntervalTime) throws IOException, InterruptedException { + List nsInfos = Lists.newArrayList(); + nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1)); + + String CURRENT_DIR = "current"; + String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE; + String path = BASE_DIR + "/newData0"; + String pathUri = new Path(path).toUri().toString(); + StorageLocation loc = StorageLocation.parse(pathUri); + Storage.StorageDirectory sd = createStorageDirectory(new File(path)); + DataStorage.VolumeBuilder builder = + new DataStorage.VolumeBuilder(storage, sd); + when( + storage.prepareVolume(eq(datanode), eq(loc.getFile()), + anyListOf(NamespaceInfo.class))).thenReturn(builder); + + String cacheFilePath = + String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0], + CURRENT_DIR, DU_CACHE_FILE); + File outFile = new File(cacheFilePath); + + if (!outFile.getParentFile().exists()) { + outFile.getParentFile().mkdirs(); + } + + if (outFile.exists()) { + outFile.delete(); + } + + FakeTimer timer = new FakeTimer(); + try { + try (Writer out = + new OutputStreamWriter(new FileOutputStream(outFile), + StandardCharsets.UTF_8)) { + // Write the dfsUsed value and the time to cache file + out.write(Long.toString(cacheDfsUsed) + " " + + Long.toString(timer.now())); + out.flush(); + } + } catch (IOException ioe) { + } + + dataset.setTimer(timer); + timer.advance(waitIntervalTime); + dataset.addVolume(loc, nsInfos); + + // Get the last volume which was just added before + FsVolumeImpl newVolume; + try (FsDatasetSpi.FsVolumeReferences volumes = + dataset.getFsVolumeReferences()) { + newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1); + } + long dfsUsed = newVolume.getDfsUsed(); + + return dfsUsed; + } }