HDFS-13768. Adding replicas to volume map makes DataNode start slowly. Contributed by Surendra Singh Lilhore.

(cherry picked from commit 5689355783)
This commit is contained in:
Yiqun Lin 2018-10-02 09:43:14 +08:00
parent c306da08ec
commit 65af98b58a
9 changed files with 300 additions and 29 deletions

View File

@ -359,6 +359,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500; public static final long DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated"; public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0; public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String
DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY =
"dfs.datanode.volumes.replica-add.threadpool.size";
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose"; public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";

View File

@ -1695,7 +1695,7 @@ public class DataNode extends ReconfigurableBase
return blockPoolManager.get(bpid); return blockPoolManager.get(bpid);
} }
int getBpOsCount() { public int getBpOsCount() {
return blockPoolManager.getAllNamenodeThreads().size(); return blockPoolManager.getAllNamenodeThreads().size();
} }

View File

@ -28,8 +28,19 @@ import java.io.InputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.io.Writer; import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Scanner; import java.util.Scanner;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -52,8 +63,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
@ -96,6 +107,17 @@ class BlockPoolSlice {
private final int maxDataLength; private final int maxDataLength;
private final FileIoProvider fileIoProvider; private final FileIoProvider fileIoProvider;
private static ForkJoinPool addReplicaThreadPool = null;
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
.getRuntime().availableProcessors();
private static final Comparator<File> FILE_COMPARATOR =
new Comparator<File>() {
@Override
public int compare(File f1, File f2) {
return f1.getName().compareTo(f2.getName());
}
};
// TODO:FEDERATION scalability issue - a thread per DU is needed // TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage; private final GetSpaceUsed dfsUsage;
@ -161,13 +183,15 @@ class BlockPoolSlice {
.setConf(conf) .setConf(conf)
.setInitialUsed(loadDfsUsed()) .setInitialUsed(loadDfsUsed())
.build(); .build();
// initialize add replica fork join pool
initializeAddReplicaPool(conf);
// Make the dfs usage to be saved during shutdown. // Make the dfs usage to be saved during shutdown.
shutdownHook = new Runnable() { shutdownHook = new Runnable() {
@Override @Override
public void run() { public void run() {
if (!dfsUsedSaved) { if (!dfsUsedSaved) {
saveDfsUsed(); saveDfsUsed();
addReplicaThreadPool.shutdownNow();
} }
} }
}; };
@ -175,6 +199,21 @@ class BlockPoolSlice {
SHUTDOWN_HOOK_PRIORITY); SHUTDOWN_HOOK_PRIORITY);
} }
private synchronized void initializeAddReplicaPool(Configuration conf) {
if (addReplicaThreadPool == null) {
FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
int numberOfBlockPoolSlice = dataset.getVolumeCount()
* dataset.getBPServiceCount();
int poolsize = Math.max(numberOfBlockPoolSlice,
VOLUMES_REPLICA_ADD_THREADPOOL_SIZE);
// Default pool sizes is max of (volume * number of bp_service) and
// number of processor.
addReplicaThreadPool = new ForkJoinPool(conf.getInt(
DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
poolsize));
}
}
File getDirectory() { File getDirectory() {
return currentDir.getParentFile(); return currentDir.getParentFile();
} }
@ -374,10 +413,55 @@ class BlockPoolSlice {
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap); boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
if (!success) { if (!success) {
List<IOException> exceptions = Collections
.synchronizedList(new ArrayList<IOException>());
Queue<RecursiveAction> subTaskQueue =
new ConcurrentLinkedQueue<RecursiveAction>();
// add finalized replicas // add finalized replicas
addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); AddReplicaProcessor task = new AddReplicaProcessor(volumeMap,
finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue);
ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task);
// add rbw replicas // add rbw replicas
addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap,
false, exceptions, subTaskQueue);
ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task);
try {
finalizedTask.get();
rbwTask.get();
} catch (InterruptedException | ExecutionException e) {
exceptions.add(new IOException(
"Failed to start sub tasks to add replica in replica map :"
+ e.getMessage()));
}
//wait for all the tasks to finish.
waitForSubTaskToFinish(subTaskQueue, exceptions);
}
}
/**
* Wait till all the recursive task for add replica to volume completed.
*
* @param subTaskQueue
* {@link AddReplicaProcessor} tasks list.
* @param exceptions
* exceptions occurred in sub tasks.
* @throws IOException
* throw if any sub task or multiple sub tasks failed.
*/
private void waitForSubTaskToFinish(Queue<RecursiveAction> subTaskQueue,
List<IOException> exceptions) throws IOException {
while (!subTaskQueue.isEmpty()) {
RecursiveAction task = subTaskQueue.poll();
if (task != null) {
task.join();
}
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
} }
} }
@ -526,10 +610,10 @@ class BlockPoolSlice {
} }
} }
ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica);
if (oldReplica == null) { ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null
volumeMap.add(bpid, newReplica); : tmpReplicaInfo;
} else { if (oldReplica != null) {
// We have multiple replicas of the same block so decide which one // We have multiple replicas of the same block so decide which one
// to keep. // to keep.
newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
@ -558,15 +642,23 @@ class BlockPoolSlice {
* storage. * storage.
* @param isFinalized true if the directory has finalized replicas; * @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas * false if the directory has rbw replicas
* @param exceptions list of exception which need to return to parent thread.
* @param subTaskQueue queue of sub tasks
*/ */
void addToReplicasMap(ReplicaMap volumeMap, File dir, void addToReplicasMap(ReplicaMap volumeMap, File dir,
final RamDiskReplicaTracker lazyWriteReplicaMap, final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
boolean isFinalized) List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue)
throws IOException { throws IOException {
File[] files = fileIoProvider.listFiles(volume, dir); File[] files = fileIoProvider.listFiles(volume, dir);
for (File file : files) { Arrays.sort(files, FILE_COMPARATOR);
for (int i = 0; i < files.length; i++) {
File file = files[i];
if (file.isDirectory()) { if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized); // Launch new sub task.
AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file,
lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue);
subTask.fork();
subTaskQueue.add(subTask);
} }
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@ -581,7 +673,7 @@ class BlockPoolSlice {
} }
long genStamp = FsDatasetUtil.getGenerationStampFromFile( long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file); files, file, i);
long blockId = Block.filename2id(file.getName()); long blockId = Block.filename2id(file.getName());
Block block = new Block(blockId, file.length(), genStamp); Block block = new Block(blockId, file.length(), genStamp);
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
@ -886,4 +978,63 @@ class BlockPoolSlice {
public long getNumOfBlocks() { public long getNumOfBlocks() {
return numOfBlocks.get(); return numOfBlocks.get();
} }
/**
* Recursive action for add replica in map.
*/
class AddReplicaProcessor extends RecursiveAction {
private ReplicaMap volumeMap;
private File dir;
private RamDiskReplicaTracker lazyWriteReplicaMap;
private boolean isFinalized;
private List<IOException> exceptions;
private Queue<RecursiveAction> subTaskQueue;
/**
* @param volumeMap
* the replicas map
* @param dir
* an input directory
* @param lazyWriteReplicaMap
* Map of replicas on transient storage.
* @param isFinalized
* true if the directory has finalized replicas; false if the
* directory has rbw replicas
* @param exceptions
* List of exception which need to return to parent thread.
* @param subTaskQueue
* queue of sub tasks
*/
AddReplicaProcessor(ReplicaMap volumeMap, File dir,
RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) {
this.volumeMap = volumeMap;
this.dir = dir;
this.lazyWriteReplicaMap = lazyWriteReplicaMap;
this.isFinalized = isFinalized;
this.exceptions = exceptions;
this.subTaskQueue = subTaskQueue;
}
@Override
protected void compute() {
try {
addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized,
exceptions, subTaskQueue);
} catch (IOException e) {
LOG.warn("Caught exception while adding replicas from " + volume
+ " in subtask. Will throw later.", e);
exceptions.add(e);
}
}
}
/**
* Return the size of fork pool used for adding replica in map.
*/
@VisibleForTesting
public static int getAddReplicaForkPoolSize() {
return addReplicaThreadPool.getPoolSize();
}
} }

View File

@ -3307,6 +3307,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.timer = newTimer; this.timer = newTimer;
} }
/**
* Return the number of BP service count.
*/
public int getBPServiceCount() {
return datanode.getBpOsCount();
}
/**
* Return the number of volume.
*/
public int getVolumeCount() {
return volumes.getVolumes().size();
}
void stopAllDataxceiverThreads(FsVolumeImpl volume) { void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
for (String blockPoolId : volumeMap.getBlockPoolList()) { for (String blockPoolId : volumeMap.getBlockPoolList()) {

View File

@ -117,21 +117,27 @@ public class FsDatasetUtil {
} }
/** /**
* Find the meta-file for the specified block file * Find the meta-file for the specified block file and then return the
* and then return the generation stamp from the name of the meta-file. * generation stamp from the name of the meta-file. Generally meta file will
* be the next file in sorted array of file's.
*
* @param listdir
* sorted list of file based on name.
* @param blockFile
* block file for which generation stamp is needed.
* @param index
* index of block file in array.
* @return generation stamp for block file.
*/ */
static long getGenerationStampFromFile(File[] listdir, File blockFile) static long getGenerationStampFromFile(File[] listdir, File blockFile,
throws IOException { int index) {
String blockName = blockFile.getName(); String blockName = blockFile.getName();
for (int j = 0; j < listdir.length; j++) { if ((index + 1) < listdir.length) {
String path = listdir[j].getName(); // Check if next index file is meta file
if (!path.startsWith(blockName)) { String metaFile = listdir[index + 1].getName();
continue; if (metaFile.startsWith(blockName)) {
return Block.getGenerationStamp(metaFile);
} }
if (blockFile.getCanonicalPath().equals(listdir[j].getCanonicalPath())) {
continue;
}
return Block.getGenerationStamp(listdir[j].getName());
} }
FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!"); FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!");
return HdfsConstants.GRANDFATHER_GENERATION_STAMP; return HdfsConstants.GRANDFATHER_GENERATION_STAMP;

View File

@ -226,8 +226,9 @@ class FsVolumeList {
throw exceptions.get(0); throw exceptions.get(0);
} }
long totalTimeTaken = Time.monotonicNow() - totalStartTime; long totalTimeTaken = Time.monotonicNow() - totalStartTime;
FsDatasetImpl.LOG.info("Total time to add all replicas to map: " FsDatasetImpl.LOG
+ totalTimeTaken + "ms"); .info("Total time to add all replicas to map for block pool " + bpid
+ ": " + totalTimeTaken + "ms");
} }
/** /**

View File

@ -135,6 +135,31 @@ class ReplicaMap {
} }
} }
/**
* Add a replica's meta information into the map, if already exist
* return the old replicaInfo.
*/
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
checkBlockPool(bpid);
checkBlock(replicaInfo);
try (AutoCloseableLock l = lock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
// Add an entry for block pool if it does not exist already
set = new FoldedTreeSet<>();
map.put(bpid, set);
}
ReplicaInfo oldReplicaInfo = set.get(replicaInfo.getBlockId(),
LONG_AND_BLOCK_COMPARATOR);
if (oldReplicaInfo != null) {
return oldReplicaInfo;
} else {
set.add(replicaInfo);
}
return replicaInfo;
}
}
/** /**
* Add all entries from the given replica map into the local replica map. * Add all entries from the given replica map into the local replica map.
*/ */

View File

@ -1278,6 +1278,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.volumes.replica-add.threadpool.size</name>
<value></value>
<description>Specifies the maximum number of threads to use for
adding block in volume. Default value for this configuration is
max of (volume * number of bp_service, number of processor).
</description>
</property>
<property> <property>
<name>dfs.image.compress</name> <name>dfs.image.compress</name>
<value>false</value> <value>false</value>

View File

@ -18,11 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@ -30,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -40,12 +46,16 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -315,4 +325,56 @@ public class TestFsVolumeList {
.build(); .build();
assertEquals(600, volume4.getReserved()); assertEquals(600, volume4.getReserved());
} }
@Test(timeout = 60000)
public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
Configuration cnf = new Configuration();
int poolSize = 5;
cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
cnf.setInt(
DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
poolSize);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(cnf).numDataNodes(1)
.storagesPerDatanode(1).build();
DistributedFileSystem fs = cluster.getFileSystem();
// Generate data blocks.
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Future<?>> futureList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread thread = new Thread() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
try {
DFSTestUtil.createFile(fs, new Path("File_" + getName() + j), 10,
(short) 1, 0);
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
}
}
};
thread.setName("FileWriter" + i);
futureList.add(pool.submit(thread));
}
// Wait for data generation
for (Future<?> f : futureList) {
f.get();
}
fs.close();
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
.getFSDataset();
ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
.getInstance(conf, fsDataset);
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
String bpid = cluster.getNamesystem().getBlockPoolId();
// It will create BlockPoolSlice.AddReplicaProcessor task's and lunch in
// ForkJoinPool recursively
vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
.size() == 1000);
assertTrue("Fork pool size should be " + poolSize,
BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize);
}
} }