HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
This commit is contained in:
Kihwal Lee 2015-03-25 14:42:28 -05:00
parent c62840d531
commit be4eabdcd4
13 changed files with 431 additions and 80 deletions

View File

@ -24,6 +24,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7713. Implement mkdirs in the HDFS Web UI. (Ravi Prakash via wheat9) HDFS-7713. Implement mkdirs in the HDFS Web UI. (Ravi Prakash via wheat9)
HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
a lot of time if disks are busy (Rushabh S Shah via kihwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.protocol; package org.apache.hadoop.hdfs.protocol;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -33,6 +35,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -108,6 +111,40 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
return builder.build(); return builder.build();
} }
public static BlockListAsLongs readFrom(InputStream is) throws IOException {
CodedInputStream cis = CodedInputStream.newInstance(is);
int numBlocks = -1;
ByteString blocksBuf = null;
while (!cis.isAtEnd()) {
int tag = cis.readTag();
int field = WireFormat.getTagFieldNumber(tag);
switch(field) {
case 0:
break;
case 1:
numBlocks = (int)cis.readInt32();
break;
case 2:
blocksBuf = cis.readBytes();
break;
default:
cis.skipField(tag);
break;
}
}
if (numBlocks != -1 && blocksBuf != null) {
return decodeBuffer(numBlocks, blocksBuf);
}
return null;
}
public void writeTo(OutputStream os) throws IOException {
CodedOutputStream cos = CodedOutputStream.newInstance(os);
cos.writeInt32(1, getNumberOfBlocks());
cos.writeBytes(2, getBlocksBuffer());
cos.flush();
}
public static Builder builder() { public static Builder builder() {
return new BlockListAsLongs.Builder(); return new BlockListAsLongs.Builder();
} }

View File

@ -41,8 +41,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMOR
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
@ -160,6 +158,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets; import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile; import org.apache.hadoop.hdfs.server.namenode.StreamFile;
@ -2507,6 +2506,10 @@ public class DataNode extends ReconfigurableBase
return blockScanner; return blockScanner;
} }
@VisibleForTesting
DirectoryScanner getDirectoryScanner() {
return directoryScanner;
}
public static void secureMain(String args[], SecureResources resources) { public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0; int errorCode = 0;

View File

@ -23,12 +23,12 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.io.Writer; import java.io.Writer;
import java.util.Iterator;
import java.util.Scanner; import java.util.Scanner;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -39,6 +39,8 @@ import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@ -55,6 +57,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.io.Files;
/** /**
* A block pool slice represents a portion of a block pool stored on a volume. * A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a * Taken together, all BlockPoolSlices sharing a block pool ID across a
@ -77,6 +80,8 @@ class BlockPoolSlice {
private volatile boolean dfsUsedSaved = false; private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final boolean deleteDuplicateReplicas; private final boolean deleteDuplicateReplicas;
private static final String REPLICA_CACHE_FILE = "replicas";
private final long replicaCacheExpiry = 5*60*1000;
// TODO:FEDERATION scalability issue - a thread per DU is needed // TODO:FEDERATION scalability issue - a thread per DU is needed
private final DU dfsUsage; private final DU dfsUsage;
@ -317,11 +322,14 @@ class BlockPoolSlice {
"Recovered " + numRecovered + " replicas from " + lazypersistDir); "Recovered " + numRecovered + " replicas from " + lazypersistDir);
} }
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
if (!success) {
// add finalized replicas // add finalized replicas
addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
// add rbw replicas // add rbw replicas
addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
} }
}
/** /**
* Recover an unlinked tmp file on datanode restart. If the original block * Recover an unlinked tmp file on datanode restart. If the original block
@ -408,44 +416,18 @@ class BlockPoolSlice {
return numRecovered; return numRecovered;
} }
/** private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
* Add replicas under the given directory to the volume map final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
* @param volumeMap the replicas map
* @param dir an input directory
* @param lazyWriteReplicaMap Map of replicas on transient
* storage.
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir,
final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException { throws IOException {
File files[] = FileUtil.listFiles(dir);
for (File file : files) {
if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
}
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
file = recoverTempUnlinkedBlock(file);
if (file == null) { // the original block still exists, so we cover it
// in another iteration and can continue here
continue;
}
}
if (!Block.isBlockFilename(file))
continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
long blockId = Block.filename2id(file.getName());
ReplicaInfo newReplica = null; ReplicaInfo newReplica = null;
long blockId = block.getBlockId();
long genStamp = block.getGenerationStamp();
if (isFinalized) { if (isFinalized) {
newReplica = new FinalizedReplica(blockId, newReplica = new FinalizedReplica(blockId,
file.length(), genStamp, volume, file.getParentFile()); block.getNumBytes(), genStamp, volume, DatanodeUtil
.idToBlockDir(finalizedDir, blockId));
} else { } else {
File file = new File(rbwDir, block.getBlockName());
boolean loadRwr = true; boolean loadRwr = true;
File restartMeta = new File(file.getParent() + File restartMeta = new File(file.getParent() +
File.pathSeparator + "." + file.getName() + ".restart"); File.pathSeparator + "." + file.getName() + ".restart");
@ -501,6 +483,44 @@ class BlockPoolSlice {
lazyWriteReplicaMap.discardReplica(bpid, blockId, false); lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
} }
} }
/**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
* @param lazyWriteReplicaMap Map of replicas on transient
* storage.
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir,
final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
File files[] = FileUtil.listFiles(dir);
for (File file : files) {
if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
}
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
file = recoverTempUnlinkedBlock(file);
if (file == null) { // the original block still exists, so we cover it
// in another iteration and can continue here
continue;
}
}
if (!Block.isBlockFilename(file))
continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
long blockId = Block.filename2id(file.getName());
Block block = new Block(blockId, file.length(), genStamp);
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
isFinalized);
}
} }
/** /**
@ -655,9 +675,121 @@ class BlockPoolSlice {
return currentDir.getAbsolutePath(); return currentDir.getAbsolutePath();
} }
void shutdown() { void shutdown(BlockListAsLongs blocksListToPersist) {
saveReplicas(blocksListToPersist);
saveDfsUsed(); saveDfsUsed();
dfsUsedSaved = true; dfsUsedSaved = true;
dfsUsage.shutdown(); dfsUsage.shutdown();
} }
private boolean readReplicasFromCache(ReplicaMap volumeMap,
final RamDiskReplicaTracker lazyWriteReplicaMap) {
ReplicaMap tmpReplicaMap = new ReplicaMap(this);
File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
// Check whether the file exists or not.
if (!replicaFile.exists()) {
LOG.info("Replica Cache file: "+ replicaFile.getPath() +
" doesn't exist ");
return false;
}
long fileLastModifiedTime = replicaFile.lastModified();
if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
LOG.info("Replica Cache file: " + replicaFile.getPath() +
" has gone stale");
// Just to make findbugs happy
if (!replicaFile.delete()) {
LOG.info("Replica Cache file: " + replicaFile.getPath() +
" cannot be deleted");
}
return false;
}
FileInputStream inputStream = null;
try {
inputStream = new FileInputStream(replicaFile);
BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream);
Iterator<BlockReportReplica> iterator = blocksList.iterator();
while (iterator.hasNext()) {
BlockReportReplica replica = iterator.next();
switch (replica.getState()) {
case FINALIZED:
addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
break;
case RUR:
case RBW:
case RWR:
addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false);
break;
default:
break;
}
}
inputStream.close();
// Now it is safe to add the replica into volumeMap
// In case of any exception during parsing this cache file, fall back
// to scan all the files on disk.
for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) {
volumeMap.add(bpid, info);
}
LOG.info("Successfully read replica from cache file : "
+ replicaFile.getPath());
return true;
} catch (Exception e) {
// Any exception we need to revert back to read from disk
// Log the error and return false
LOG.info("Exception occured while reading the replicas cache file: "
+ replicaFile.getPath(), e );
return false;
}
finally {
if (!replicaFile.delete()) {
LOG.info("Failed to delete replica cache file: " +
replicaFile.getPath());
}
// close the inputStream
IOUtils.closeStream(inputStream);
}
}
private void saveReplicas(BlockListAsLongs blocksListToPersist) {
if (blocksListToPersist == null ||
blocksListToPersist.getNumberOfBlocks()== 0) {
return;
}
File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
if (tmpFile.exists() && !tmpFile.delete()) {
LOG.warn("Failed to delete tmp replicas file in " +
tmpFile.getPath());
return;
}
File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
LOG.warn("Failed to delete replicas file in " +
replicaCacheFile.getPath());
return;
}
FileOutputStream out = null;
try {
out = new FileOutputStream(tmpFile);
blocksListToPersist.writeTo(out);
out.close();
// Renaming the tmp file to replicas
Files.move(tmpFile, replicaCacheFile);
} catch (Exception e) {
// If write failed, the volume might be bad. Since the cache file is
// not critical, log the error, delete both the files (tmp and cache)
// and continue.
LOG.warn("Failed to write replicas to cache ", e);
if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
LOG.warn("Failed to delete replicas file: " +
replicaCacheFile.getPath());
}
} finally {
IOUtils.closeStream(out);
if (tmpFile.exists() && !tmpFile.delete()) {
LOG.warn("Failed to delete tmp file in " +
tmpFile.getPath());
}
}
}
} }

View File

@ -2463,8 +2463,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override @Override
public synchronized void shutdownBlockPool(String bpid) { public synchronized void shutdownBlockPool(String bpid) {
LOG.info("Removing block pool " + bpid); LOG.info("Removing block pool " + bpid);
Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume = getBlockReports(bpid);
volumeMap.cleanUpBlockPool(bpid); volumeMap.cleanUpBlockPool(bpid);
volumes.removeBlockPool(bpid); volumes.removeBlockPool(bpid, blocksPerVolume);
} }
/** /**

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@ -65,7 +66,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -805,7 +805,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet(); Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) { for (Entry<String, BlockPoolSlice> entry : set) {
entry.getValue().shutdown(); entry.getValue().shutdown(null);
} }
} }
@ -815,10 +815,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
bpSlices.put(bpid, bp); bpSlices.put(bpid, bp);
} }
void shutdownBlockPool(String bpid) { void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) { if (bp != null) {
bp.shutdown(); bp.shutdown(blocksListsAsLongs);
} }
bpSlices.remove(bpid); bpSlices.remove(bpid);
} }

View File

@ -35,10 +35,12 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -428,9 +430,10 @@ class FsVolumeList {
bpid + ": " + totalTimeTaken + "ms"); bpid + ": " + totalTimeTaken + "ms");
} }
void removeBlockPool(String bpid) { void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
blocksPerVolume) {
for (FsVolumeImpl v : volumes.get()) { for (FsVolumeImpl v : volumes.get()) {
v.shutdownBlockPool(bpid); v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
} }
} }

View File

@ -304,10 +304,11 @@ public class UpgradeUtilities {
continue; continue;
} }
// skip VERSION and dfsUsed file for DataNodes // skip VERSION and dfsUsed and replicas file for DataNodes
if (nodeType == DATA_NODE && if (nodeType == DATA_NODE &&
(list[i].getName().equals("VERSION") || (list[i].getName().equals("VERSION") ||
list[i].getName().equals("dfsUsed"))) { list[i].getName().equals("dfsUsed") ||
list[i].getName().equals("replicas"))) {
continue; continue;
} }

View File

@ -218,4 +218,11 @@ public class DataNodeTestUtils {
} }
} }
} }
public static void runDirectoryScanner(DataNode dn) throws IOException {
DirectoryScanner directoryScanner = dn.getDirectoryScanner();
if (directoryScanner != null) {
dn.getDirectoryScanner().reconcile();
}
}
} }

View File

@ -17,14 +17,25 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
@ -34,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -501,4 +513,144 @@ public class TestWriteToReplica {
+ "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]); + "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]);
} }
} }
/**
* This is a test to check the replica map before and after the datanode
* quick restart (less than 5 minutes)
* @throws Exception
*/
@Test
public void testReplicaMapAfterDatanodeRestart() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build();
try {
cluster.waitActive();
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
assertNotNull("cannot create nn1", nn1);
assertNotNull("cannot create nn2", nn2);
// check number of volumes in fsdataset
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
getFSDataset(dn);
ReplicaMap replicaMap = dataSet.volumeMap;
List<FsVolumeImpl> volumes = dataSet.getVolumes();
// number of volumes should be 2 - [data1, data2]
assertEquals("number of volumes is wrong", 2, volumes.size());
ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
cluster.getNamesystem(0).getBlockPoolId(),
cluster.getNamesystem(1).getBlockPoolId()));
Assert.assertTrue("Cluster should have 2 block pools",
bpList.size() == 2);
createReplicas(bpList, volumes, replicaMap);
ReplicaMap oldReplicaMap = new ReplicaMap(this);
oldReplicaMap.addAll(replicaMap);
cluster.restartDataNode(0);
cluster.waitActive();
dn = cluster.getDataNodes().get(0);
dataSet = (FsDatasetImpl) dn.getFSDataset();
testEqualityOfReplicaMap(oldReplicaMap, dataSet.volumeMap, bpList);
} finally {
cluster.shutdown();
}
}
/**
* Compare the replica map before and after the restart
**/
private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap
newReplicaMap, List<String> bpidList) {
// Traversing through newReplica map and remove the corresponding
// replicaInfo from oldReplicaMap.
for (String bpid: bpidList) {
for (ReplicaInfo info: newReplicaMap.replicas(bpid)) {
assertNotNull("Volume map before restart didn't contain the "
+ "blockpool: " + bpid, oldReplicaMap.replicas(bpid));
ReplicaInfo oldReplicaInfo = oldReplicaMap.get(bpid,
info.getBlockId());
// Volume map after restart contains a blockpool id which
assertNotNull("Old Replica Map didnt't contain block with blockId: " +
info.getBlockId(), oldReplicaInfo);
ReplicaState oldState = oldReplicaInfo.getState();
// Since after restart, all the RWR, RBW and RUR blocks gets
// converted to RWR
if (info.getState() == ReplicaState.RWR) {
if (oldState == ReplicaState.RWR || oldState == ReplicaState.RBW
|| oldState == ReplicaState.RUR) {
oldReplicaMap.remove(bpid, oldReplicaInfo);
}
} else if (info.getState() == ReplicaState.FINALIZED &&
oldState == ReplicaState.FINALIZED) {
oldReplicaMap.remove(bpid, oldReplicaInfo);
}
}
}
// We don't persist the ReplicaInPipeline replica
// and if the old replica map contains any replica except ReplicaInPipeline
// then we didn't persist that replica
for (String bpid: bpidList) {
for (ReplicaInfo replicaInfo: oldReplicaMap.replicas(bpid)) {
if (replicaInfo.getState() != ReplicaState.TEMPORARY) {
Assert.fail("After datanode restart we lost the block with blockId: "
+ replicaInfo.getBlockId());
}
}
}
}
private void createReplicas(List<String> bpList, List<FsVolumeImpl> volumes,
ReplicaMap volumeMap) throws IOException {
Assert.assertTrue("Volume map can't be null" , volumeMap != null);
// Here we create all different type of replicas and add it
// to volume map.
// Created all type of ReplicaInfo, each under Blkpool corresponding volume
long id = 1; // This variable is used as both blockId and genStamp
for (String bpId: bpList) {
for (FsVolumeImpl volume: volumes) {
ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume,
DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id));
volumeMap.add(bpId, finalizedReplica);
id++;
ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume,
volume.getRbwDir(bpId), null, 100);
volumeMap.add(bpId, rbwReplica);
id++;
ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id,
volume, volume.getRbwDir(bpId));
volumeMap.add(bpId, rwrReplica);
id++;
ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume,
volume.getTmpDir(bpId), 0);
volumeMap.add(bpId, ripReplica);
id++;
}
}
for (String bpId: bpList) {
for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) {
File parentFile = replicaInfo.getBlockFile().getParentFile();
if (!parentFile.exists()) {
if (!parentFile.mkdirs()) {
throw new IOException("Failed to mkdirs " + parentFile);
}
}
replicaInfo.getBlockFile().createNewFile();
replicaInfo.getMetaFile().createNewFile();
}
}
}
} }

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -483,6 +485,10 @@ public class TestListCorruptFileBlocks {
} }
} }
// Run the direcrtoryScanner to update the Datanodes volumeMap
DataNode dn = cluster.getDataNodes().get(0);
DataNodeTestUtils.runDirectoryScanner(dn);
// Occasionally the BlockPoolSliceScanner can run before we have removed // Occasionally the BlockPoolSliceScanner can run before we have removed
// the blocks. Restart the Datanode to trigger the scanner into running // the blocks. Restart the Datanode to trigger the scanner into running
// once more. // once more.

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.Test; import org.junit.Test;
public class TestProcessCorruptBlocks { public class TestProcessCorruptBlocks {
@ -269,6 +270,8 @@ public class TestProcessCorruptBlocks {
// But the datadirectory will not change // But the datadirectory will not change
assertTrue(cluster.corruptReplica(dnIndex, block)); assertTrue(cluster.corruptReplica(dnIndex, block));
// Run directory scanner to update the DN's volume map
DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
DataNodeProperties dnProps = cluster.stopDataNode(0); DataNodeProperties dnProps = cluster.stopDataNode(0);
// Each datanode has multiple data dirs, check each // Each datanode has multiple data dirs, check each

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.util.ThreadUtil;
import org.junit.Test; import org.junit.Test;
@ -69,6 +70,8 @@ public class TestPendingCorruptDnMessages {
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
assertTrue(cluster.changeGenStampOfBlock(0, block, 900)); assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
// Run directory dsscanner to update Datanode's volumeMap
DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
// Stop the DN so the replica with the changed gen stamp will be reported // Stop the DN so the replica with the changed gen stamp will be reported
// when this DN starts up. // when this DN starts up.
DataNodeProperties dnProps = cluster.stopDataNode(0); DataNodeProperties dnProps = cluster.stopDataNode(0);