diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 51900a484cf..ba1498cc195 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -156,6 +156,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_FIXED_VOLUME_SIZE_KEY = "dfs.datanode.fixed.volume.size"; public static final boolean DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT = false; + public static final String DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY = + "dfs.datanode.replica.cache.root.dir"; + public static final String DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_KEY = + "dfs.datanode.replica.cache.expiry.time"; + public static final long DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_DEFAULT = + 300000; // This setting is for testing/internal use only. public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 1843faa733b..fc84c4d280c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -44,6 +44,7 @@ import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; import org.slf4j.Logger; @@ -103,7 +104,8 @@ class BlockPoolSlice { private static final int SHUTDOWN_HOOK_PRIORITY = 30; private final boolean deleteDuplicateReplicas; private static final String REPLICA_CACHE_FILE = "replicas"; - private final long replicaCacheExpiry = 5*60*1000; + private final long replicaCacheExpiry; + private final File replicaCacheDir; private AtomicLong numOfBlocks = new AtomicLong(); private final long cachedDfsUsedCheckTime; private final Timer timer; @@ -180,6 +182,24 @@ public int compare(File f1, File f2) { fileIoProvider.mkdirs(volume, rbwDir); fileIoProvider.mkdirs(volume, tmpDir); + String cacheDirRoot = conf.get( + DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY); + if (cacheDirRoot != null && !cacheDirRoot.isEmpty()) { + this.replicaCacheDir = new File(cacheDirRoot, + currentDir.getCanonicalPath()); + if (!this.replicaCacheDir.exists()) { + if (!this.replicaCacheDir.mkdirs()) { + throw new IOException("Failed to mkdirs " + this.replicaCacheDir); + } + } + } else { + this.replicaCacheDir = currentDir; + } + this.replicaCacheExpiry = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_KEY, + DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_EXPIRY_TIME_DEFAULT, + TimeUnit.MILLISECONDS); + // Use cached value initially if available. Or the following call will // block until the initial du command completes. this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid) @@ -876,7 +896,7 @@ void shutdown(BlockListAsLongs blocksListToPersist) { private boolean readReplicasFromCache(ReplicaMap volumeMap, final RamDiskReplicaTracker lazyWriteReplicaMap) { ReplicaMap tmpReplicaMap = new ReplicaMap(new ReentrantReadWriteLock()); - File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); + File replicaFile = new File(replicaCacheDir, REPLICA_CACHE_FILE); // Check whether the file exists or not. if (!replicaFile.exists()) { LOG.info("Replica Cache file: "+ replicaFile.getPath() + @@ -954,8 +974,8 @@ private void saveReplicas(BlockListAsLongs blocksListToPersist) { blocksListToPersist.getNumberOfBlocks()== 0) { return; } - final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); - final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); + final File tmpFile = new File(replicaCacheDir, REPLICA_CACHE_FILE + ".tmp"); + final File replicaCacheFile = new File(replicaCacheDir, REPLICA_CACHE_FILE); if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) || !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) { return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 0b7d13af725..ede3dc09c0b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4516,6 +4516,23 @@ + + dfs.datanode.replica.cache.root.dir + + + Use this key to change root dir of replica cache. + The default root dir is currentDir. + + + + + dfs.datanode.replica.cache.expiry.time + 5m + + Living time of replica cached files in milliseconds. + + + dfs.ha.fencing.methods diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 39cbdd0ad5e..d911ef1ba26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -113,6 +113,7 @@ public class TestFsDatasetImpl { Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class); private static final String BASE_DIR = new FileSystemTestHelper().getTestRootDir(); + private String replicaCacheRootDir = BASE_DIR + Path.SEPARATOR + "cache"; private static final int NUM_INIT_VOLUMES = 2; private static final String CLUSTER_ID = "cluser-id"; private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"}; @@ -172,6 +173,8 @@ public void setUp() throws IOException { storage = mock(DataStorage.class); this.conf = new Configuration(); this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); + this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY, + replicaCacheRootDir); when(datanode.getConf()).thenReturn(conf); final DNConf dnConf = new DNConf(datanode); @@ -963,4 +966,25 @@ public void testDataDirWithPercent() throws IOException { new StorageDirectory(StorageLocation.parse(dataDir.getPath()))) .build(); } + + @Test + public void testReplicaCacheFileToOtherPlace() throws IOException { + final String bpid = "bpid-0"; + for (int i = 0; i < 5; i++) { + ExtendedBlock eb = new ExtendedBlock(bpid, i); + dataset.createRbw(StorageType.DEFAULT, null, eb, false); + } + List cacheFiles = new ArrayList<>(); + for (FsVolumeSpi vol: dataset.getFsVolumeReferences()) { + BlockPoolSlice bpSlice = ((FsVolumeImpl)vol).getBlockPoolSlice(bpid); + File cacheFile = new File(replicaCacheRootDir + Path.SEPARATOR + + bpSlice.getDirectory().getCanonicalPath() + Path.SEPARATOR + + DataStorage.STORAGE_DIR_CURRENT + Path.SEPARATOR + "replicas"); + cacheFiles.add(cacheFile); + } + dataset.shutdownBlockPool(bpid); + for (File f : cacheFiles) { + assertTrue(f.exists()); + } + } }