diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a72ff871cef..edd99f267cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -304,6 +304,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500; public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated"; public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0; + public static final String + DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY = + "dfs.datanode.volumes.replica-add.threadpool.size"; public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose"; public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index dfca5d6c478..80a7ca2f3bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1630,7 +1630,7 @@ public class DataNode extends ReconfigurableBase return blockPoolManager.get(bpid); } - int getBpOsCount() { + public int getBpOsCount() { return blockPoolManager.getAllNamenodeThreads().size(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index f46b6a41962..0ed5c391095 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -28,8 +28,19 @@ import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.List; +import java.util.Queue; import java.util.Scanner; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -52,8 +63,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; - import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; @@ -95,6 +106,17 @@ class BlockPoolSlice { private final int maxDataLength; private final FileIoProvider fileIoProvider; + private static ForkJoinPool addReplicaThreadPool = null; + private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime + .getRuntime().availableProcessors(); + public static final Comparator FILE_COMPARATOR = + new Comparator() { + @Override + public int compare(File f1, File f2) { + return f1.getName().compareTo(f2.getName()); + } + }; + // TODO:FEDERATION scalability issue - a thread per DU is needed private final GetSpaceUsed dfsUsage; @@ -167,6 +189,10 @@ class BlockPoolSlice { .setInitialUsed(loadDfsUsed()) .build(); + if (addReplicaThreadPool == null) { + // initialize add replica fork join pool + initializeAddReplicaPool(conf); + } // Make the dfs usage to be saved during shutdown. ShutdownHookManager.get().addShutdownHook( new Runnable() { @@ -174,11 +200,27 @@ class BlockPoolSlice { public void run() { if (!dfsUsedSaved) { saveDfsUsed(); + addReplicaThreadPool.shutdownNow(); } } }, SHUTDOWN_HOOK_PRIORITY); } + private synchronized void initializeAddReplicaPool(Configuration conf) { + if (addReplicaThreadPool == null) { + FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset(); + int numberOfBlockPoolSlice = dataset.getVolumeCount() + * dataset.getBPServiceCount(); + int poolsize = Math.max(numberOfBlockPoolSlice, + VOLUMES_REPLICA_ADD_THREADPOOL_SIZE); + // Default pool sizes is max of (volume * number of bp_service) and + // number of processor. + addReplicaThreadPool = new ForkJoinPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY, + poolsize)); + } + } + File getDirectory() { return currentDir.getParentFile(); } @@ -367,10 +409,55 @@ class BlockPoolSlice { boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap); if (!success) { - // add finalized replicas - addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); - // add rbw replicas - addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); + List exceptions = Collections + .synchronizedList(new ArrayList()); + Queue subTaskQueue = + new ConcurrentLinkedQueue(); + + // add finalized replicas + AddReplicaProcessor task = new AddReplicaProcessor(volumeMap, + finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue); + ForkJoinTask finalizedTask = addReplicaThreadPool.submit(task); + + // add rbw replicas + task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap, + false, exceptions, subTaskQueue); + ForkJoinTask rbwTask = addReplicaThreadPool.submit(task); + + try { + finalizedTask.get(); + rbwTask.get(); + } catch (InterruptedException | ExecutionException e) { + exceptions.add(new IOException( + "Failed to start sub tasks to add replica in replica map :" + + e.getMessage())); + } + + //wait for all the tasks to finish. + waitForSubTaskToFinish(subTaskQueue, exceptions); + } + } + + /** + * Wait till all the recursive task for add replica to volume completed. + * + * @param subTaskQueue + * {@link AddReplicaProcessor} tasks list. + * @param exceptions + * exceptions occurred in sub tasks. + * @throws IOException + * throw if any sub task or multiple sub tasks failed. + */ + private void waitForSubTaskToFinish(Queue subTaskQueue, + List exceptions) throws IOException { + while (!subTaskQueue.isEmpty()) { + RecursiveAction task = subTaskQueue.poll(); + if (task != null) { + task.join(); + } + } + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); } } @@ -505,10 +592,10 @@ class BlockPoolSlice { } } - ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); - if (oldReplica == null) { - volumeMap.add(bpid, newReplica); - } else { + ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica); + ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null + : tmpReplicaInfo; + if (oldReplica != null) { // We have multiple replicas of the same block so decide which one // to keep. newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); @@ -537,15 +624,23 @@ class BlockPoolSlice { * storage. * @param isFinalized true if the directory has finalized replicas; * false if the directory has rbw replicas + * @param exceptions list of exception which need to return to parent thread. + * @param subTaskQueue queue of sub tasks */ void addToReplicasMap(ReplicaMap volumeMap, File dir, - final RamDiskReplicaTracker lazyWriteReplicaMap, - boolean isFinalized) + final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized, + List exceptions, Queue subTaskQueue) throws IOException { File[] files = fileIoProvider.listFiles(volume, dir); - for (File file : files) { + Arrays.sort(files, FILE_COMPARATOR); + for (int i = 0; i < files.length; i++) { + File file = files[i]; if (file.isDirectory()) { - addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized); + // Launch new sub task. + AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file, + lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue); + subTask.fork(); + subTaskQueue.add(subTask); } if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { @@ -560,7 +655,7 @@ class BlockPoolSlice { } long genStamp = FsDatasetUtil.getGenerationStampFromFile( - files, file); + files, file, i); long blockId = Block.filename2id(file.getName()); Block block = new Block(blockId, file.length(), genStamp); addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, @@ -862,4 +957,63 @@ class BlockPoolSlice { public long getNumOfBlocks() { return numOfBlocks.get(); } + + /** + * Recursive action for add replica in map. + */ + class AddReplicaProcessor extends RecursiveAction { + + private ReplicaMap volumeMap; + private File dir; + private RamDiskReplicaTracker lazyWriteReplicaMap; + private boolean isFinalized; + private List exceptions; + private Queue subTaskQueue; + + /** + * @param volumeMap + * the replicas map + * @param dir + * an input directory + * @param lazyWriteReplicaMap + * Map of replicas on transient storage. + * @param isFinalized + * true if the directory has finalized replicas; false if the + * directory has rbw replicas + * @param exceptions + * List of exception which need to return to parent thread. + * @param subTaskQueue + * queue of sub tasks + */ + AddReplicaProcessor(ReplicaMap volumeMap, File dir, + RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized, + List exceptions, Queue subTaskQueue) { + this.volumeMap = volumeMap; + this.dir = dir; + this.lazyWriteReplicaMap = lazyWriteReplicaMap; + this.isFinalized = isFinalized; + this.exceptions = exceptions; + this.subTaskQueue = subTaskQueue; + } + + @Override + protected void compute() { + try { + addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized, + exceptions, subTaskQueue); + } catch (IOException e) { + LOG.warn("Caught exception while adding replicas from " + volume + + " in subtask. Will throw later.", e); + exceptions.add(e); + } + } + } + + /** + * Return the size of fork pool used for adding replica in map. + */ + @VisibleForTesting + public static int getAddReplicaForkPoolSize() { + return addReplicaThreadPool.getPoolSize(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 1683b3fd73f..12d60cc76fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3473,6 +3473,20 @@ class FsDatasetImpl implements FsDatasetSpi { this.timer = newTimer; } + /** + * Return the number of BP service count. + */ + public int getBPServiceCount() { + return datanode.getBpOsCount(); + } + + /** + * Return the number of volume. + */ + public int getVolumeCount() { + return volumes.getVolumes().size(); + } + void stopAllDataxceiverThreads(FsVolumeImpl volume) { try(AutoCloseableLock lock = datasetLock.acquire()) { for (String blockPoolId : volumeMap.getBlockPoolList()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java index 40ccfc2fbd0..1839f847952 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java @@ -92,21 +92,27 @@ public class FsDatasetUtil { } /** - * Find the meta-file for the specified block file - * and then return the generation stamp from the name of the meta-file. + * Find the meta-file for the specified block file and then return the + * generation stamp from the name of the meta-file. Generally meta file will + * be the next file in sorted array of file's. + * + * @param listdir + * sorted list of file based on name. + * @param blockFile + * block file for which generation stamp is needed. + * @param index + * index of block file in array. + * @return generation stamp for block file. */ - static long getGenerationStampFromFile(File[] listdir, File blockFile) - throws IOException { + static long getGenerationStampFromFile(File[] listdir, File blockFile, + int index) { String blockName = blockFile.getName(); - for (int j = 0; j < listdir.length; j++) { - String path = listdir[j].getName(); - if (!path.startsWith(blockName)) { - continue; + if ((index + 1) < listdir.length) { + // Check if next index file is meta file + String metaFile = listdir[index + 1].getName(); + if (metaFile.startsWith(blockName)) { + return Block.getGenerationStamp(metaFile); } - if (blockFile.getCanonicalPath().equals(listdir[j].getCanonicalPath())) { - continue; - } - return Block.getGenerationStamp(listdir[j].getName()); } FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!"); return HdfsConstants.GRANDFATHER_GENERATION_STAMP; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 5d76b33f137..3e345330d65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -221,8 +221,9 @@ class FsVolumeList { throw exceptions.get(0); } long totalTimeTaken = Time.monotonicNow() - totalStartTime; - FsDatasetImpl.LOG.info("Total time to add all replicas to map: " - + totalTimeTaken + "ms"); + FsDatasetImpl.LOG + .info("Total time to add all replicas to map for block pool " + bpid + + ": " + totalTimeTaken + "ms"); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index 5705792cefb..56f037ae658 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -120,6 +120,30 @@ class ReplicaMap { } } + /** + * Add a replica's meta information into the map, if already exist + * return the old replicaInfo. + */ + ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) { + checkBlockPool(bpid); + checkBlock(replicaInfo); + try (AutoCloseableLock l = lock.acquire()) { + LightWeightResizableGSet m = map.get(bpid); + if (m == null) { + // Add an entry for block pool if it does not exist already + m = new LightWeightResizableGSet(); + map.put(bpid, m); + } + ReplicaInfo oldReplicaInfo = m.get(new Block(replicaInfo.getBlockId())); + if (oldReplicaInfo != null) { + return oldReplicaInfo; + } else { + m.put(replicaInfo); + } + return replicaInfo; + } + } + /** * Add all entries from the given replica map into the local replica map. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 10f78b452ef..5802fce6f4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1227,6 +1227,15 @@ + + dfs.datanode.volumes.replica-add.threadpool.size + + Specifies the maximum number of threads to use for + adding block in volume. Default value for this configuration is + max of (volume * number of bp_service, number of processor). + + + dfs.image.compress false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index b8138c00c40..9b7ac3d5dd1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.base.Preconditions; + import org.apache.commons.io.FileExistsException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.DF; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; @@ -55,6 +57,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.nio.file.Files; @@ -412,7 +415,16 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { File f = getBlockFile(block); File dir = f.getParentFile(); File[] files = FileUtil.listFiles(dir); - return FsDatasetUtil.getGenerationStampFromFile(files, f); + Arrays.sort(files, BlockPoolSlice.FILE_COMPARATOR); + String blockName = f.getName(); + for (int i = 0; i < files.length; i++) { + String path = files[i].getName(); + if (!path.startsWith(blockName)) { + continue; + } + return FsDatasetUtil.getGenerationStampFromFile(files, f, i); + } + return HdfsConstants.GRANDFATHER_GENERATION_STAMP; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 147b2cb1c06..92a0bd780e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -18,16 +18,22 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.base.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.StringUtils; import org.junit.Before; import org.junit.Test; @@ -38,12 +44,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -242,4 +252,56 @@ public class TestFsVolumeList { conf, StorageType.ARCHIVE, usage); assertEquals(600, volume4.getReserved()); } + + @Test(timeout = 60000) + public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception { + Configuration cnf = new Configuration(); + int poolSize = 5; + cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + cnf.setInt( + DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY, + poolSize); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(cnf).numDataNodes(1) + .storagesPerDatanode(1).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + // Generate data blocks. + ExecutorService pool = Executors.newFixedThreadPool(10); + List> futureList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Thread thread = new Thread() { + @Override + public void run() { + for (int j = 0; j < 10; j++) { + try { + DFSTestUtil.createFile(fs, new Path("File_" + getName() + j), 10, + (short) 1, 0); + } catch (IllegalArgumentException | IOException e) { + e.printStackTrace(); + } + } + } + }; + thread.setName("FileWriter" + i); + futureList.add(pool.submit(thread)); + } + // Wait for data generation + for (Future f : futureList) { + f.get(); + } + fs.close(); + FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0) + .getFSDataset(); + ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock()); + RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker + .getInstance(conf, fsDataset); + FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0); + String bpid = cluster.getNamesystem().getBlockPoolId(); + // It will create BlockPoolSlice.AddReplicaProcessor task's and lunch in + // ForkJoinPool recursively + vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap); + assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid) + .size() == 1000); + assertTrue("Fork pool size should be " + poolSize, + BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize); + } }