From fc1031af749435dc95efea6745b1b2300ce29446 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Wed, 25 Mar 2015 14:42:28 -0500 Subject: [PATCH] HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/protocol/BlockListAsLongs.java | 37 +++ .../hadoop/hdfs/server/datanode/DataNode.java | 7 +- .../fsdataset/impl/BlockPoolSlice.java | 268 +++++++++++++----- .../fsdataset/impl/FsDatasetImpl.java | 3 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 8 +- .../datanode/fsdataset/impl/FsVolumeList.java | 7 +- .../apache/hadoop/hdfs/UpgradeUtilities.java | 5 +- .../server/datanode/DataNodeTestUtils.java | 7 + .../fsdataset/impl/TestWriteToReplica.java | 154 +++++++++- .../namenode/TestListCorruptFileBlocks.java | 6 + .../namenode/TestProcessCorruptBlocks.java | 3 + .../ha/TestPendingCorruptDnMessages.java | 3 + 13 files changed, 431 insertions(+), 80 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8f1d5fc5324..62c2f9174c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -339,6 +339,9 @@ Release 2.8.0 - UNRELEASED HDFS-7713. Implement mkdirs in the HDFS Web UI. (Ravi Prakash via wheat9) + HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes + a lot of time if disks are busy (Rushabh S Shah via kihwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java index 1c89ee4a342..834546b6605 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.protocol; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,6 +35,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.WireFormat; @InterfaceAudience.Private @InterfaceStability.Evolving @@ -108,6 +111,40 @@ public static BlockListAsLongs encode( return builder.build(); } + public static BlockListAsLongs readFrom(InputStream is) throws IOException { + CodedInputStream cis = CodedInputStream.newInstance(is); + int numBlocks = -1; + ByteString blocksBuf = null; + while (!cis.isAtEnd()) { + int tag = cis.readTag(); + int field = WireFormat.getTagFieldNumber(tag); + switch(field) { + case 0: + break; + case 1: + numBlocks = (int)cis.readInt32(); + break; + case 2: + blocksBuf = cis.readBytes(); + break; + default: + cis.skipField(tag); + break; + } + } + if (numBlocks != -1 && blocksBuf != null) { + return decodeBuffer(numBlocks, blocksBuf); + } + return null; + } + + public void writeTo(OutputStream os) throws IOException { + CodedOutputStream cos = CodedOutputStream.newInstance(os); + cos.writeInt32(1, getNumberOfBlocks()); + cos.writeBytes(2, getBlocksBuffer()); + cos.flush(); + } + public static Builder builder() { return new BlockListAsLongs.Builder(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index d94375e8c65..3368124464c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -41,8 +41,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; @@ -159,6 +157,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -2500,6 +2499,10 @@ public BlockScanner getBlockScanner() { return blockScanner; } + @VisibleForTesting + DirectoryScanner getDirectoryScanner() { + return directoryScanner; + } public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5a69e1e4d66..6daf03944ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -23,12 +23,12 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.io.Writer; +import java.util.Iterator; import java.util.Scanner; import org.apache.commons.io.FileUtils; @@ -39,6 +39,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; @@ -55,6 +57,7 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.Time; +import com.google.common.io.Files; /** * A block pool slice represents a portion of a block pool stored on a volume. * Taken together, all BlockPoolSlices sharing a block pool ID across a @@ -77,7 +80,9 @@ class BlockPoolSlice { private volatile boolean dfsUsedSaved = false; private static final int SHUTDOWN_HOOK_PRIORITY = 30; private final boolean deleteDuplicateReplicas; - + private static final String REPLICA_CACHE_FILE = "replicas"; + private final long replicaCacheExpiry = 5*60*1000; + // TODO:FEDERATION scalability issue - a thread per DU is needed private final DU dfsUsage; @@ -310,11 +315,14 @@ void getVolumeMap(ReplicaMap volumeMap, FsDatasetImpl.LOG.info( "Recovered " + numRecovered + " replicas from " + lazypersistDir); } - - // add finalized replicas - addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); - // add rbw replicas - addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); + + boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap); + if (!success) { + // add finalized replicas + addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true); + // add rbw replicas + addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false); + } } /** @@ -401,6 +409,75 @@ private int moveLazyPersistReplicasToFinalized(File source) FileUtil.fullyDelete(source); return numRecovered; } + + private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap, + final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized) + throws IOException { + ReplicaInfo newReplica = null; + long blockId = block.getBlockId(); + long genStamp = block.getGenerationStamp(); + if (isFinalized) { + newReplica = new FinalizedReplica(blockId, + block.getNumBytes(), genStamp, volume, DatanodeUtil + .idToBlockDir(finalizedDir, blockId)); + } else { + File file = new File(rbwDir, block.getBlockName()); + boolean loadRwr = true; + File restartMeta = new File(file.getParent() + + File.pathSeparator + "." + file.getName() + ".restart"); + Scanner sc = null; + try { + sc = new Scanner(restartMeta, "UTF-8"); + // The restart meta file exists + if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { + // It didn't expire. Load the replica as a RBW. + // We don't know the expected block length, so just use 0 + // and don't reserve any more space for writes. + newReplica = new ReplicaBeingWritten(blockId, + validateIntegrityAndSetLength(file, genStamp), + genStamp, volume, file.getParentFile(), null, 0); + loadRwr = false; + } + sc.close(); + if (!restartMeta.delete()) { + FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + + restartMeta.getPath()); + } + } catch (FileNotFoundException fnfe) { + // nothing to do hereFile dir = + } finally { + if (sc != null) { + sc.close(); + } + } + // Restart meta doesn't exist or expired. + if (loadRwr) { + newReplica = new ReplicaWaitingToBeRecovered(blockId, + validateIntegrityAndSetLength(file, genStamp), + genStamp, volume, file.getParentFile()); + } + } + + ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); + if (oldReplica == null) { + volumeMap.add(bpid, newReplica); + } else { + // We have multiple replicas of the same block so decide which one + // to keep. + newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); + } + + // If we are retaining a replica on transient storage make sure + // it is in the lazyWriteReplicaMap so it can be persisted + // eventually. + if (newReplica.getVolume().isTransientStorage()) { + lazyWriteReplicaMap.addReplica(bpid, blockId, + (FsVolumeImpl) newReplica.getVolume()); + } else { + lazyWriteReplicaMap.discardReplica(bpid, blockId, false); + } + } + /** * Add replicas under the given directory to the volume map @@ -434,66 +511,9 @@ void addToReplicasMap(ReplicaMap volumeMap, File dir, long genStamp = FsDatasetUtil.getGenerationStampFromFile( files, file); long blockId = Block.filename2id(file.getName()); - ReplicaInfo newReplica = null; - if (isFinalized) { - newReplica = new FinalizedReplica(blockId, - file.length(), genStamp, volume, file.getParentFile()); - } else { - - boolean loadRwr = true; - File restartMeta = new File(file.getParent() + - File.pathSeparator + "." + file.getName() + ".restart"); - Scanner sc = null; - try { - sc = new Scanner(restartMeta, "UTF-8"); - // The restart meta file exists - if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { - // It didn't expire. Load the replica as a RBW. - // We don't know the expected block length, so just use 0 - // and don't reserve any more space for writes. - newReplica = new ReplicaBeingWritten(blockId, - validateIntegrityAndSetLength(file, genStamp), - genStamp, volume, file.getParentFile(), null, 0); - loadRwr = false; - } - sc.close(); - if (!restartMeta.delete()) { - FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " + - restartMeta.getPath()); - } - } catch (FileNotFoundException fnfe) { - // nothing to do hereFile dir = - } finally { - if (sc != null) { - sc.close(); - } - } - // Restart meta doesn't exist or expired. - if (loadRwr) { - newReplica = new ReplicaWaitingToBeRecovered(blockId, - validateIntegrityAndSetLength(file, genStamp), - genStamp, volume, file.getParentFile()); - } - } - - ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId()); - if (oldReplica == null) { - volumeMap.add(bpid, newReplica); - } else { - // We have multiple replicas of the same block so decide which one - // to keep. - newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap); - } - - // If we are retaining a replica on transient storage make sure - // it is in the lazyWriteReplicaMap so it can be persisted - // eventually. - if (newReplica.getVolume().isTransientStorage()) { - lazyWriteReplicaMap.addReplica(bpid, blockId, - (FsVolumeImpl) newReplica.getVolume()); - } else { - lazyWriteReplicaMap.discardReplica(bpid, blockId, false); - } + Block block = new Block(blockId, file.length(), genStamp); + addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, + isFinalized); } } @@ -649,9 +669,121 @@ public String toString() { return currentDir.getAbsolutePath(); } - void shutdown() { + void shutdown(BlockListAsLongs blocksListToPersist) { + saveReplicas(blocksListToPersist); saveDfsUsed(); dfsUsedSaved = true; dfsUsage.shutdown(); } + + private boolean readReplicasFromCache(ReplicaMap volumeMap, + final RamDiskReplicaTracker lazyWriteReplicaMap) { + ReplicaMap tmpReplicaMap = new ReplicaMap(this); + File replicaFile = new File(currentDir, REPLICA_CACHE_FILE); + // Check whether the file exists or not. + if (!replicaFile.exists()) { + LOG.info("Replica Cache file: "+ replicaFile.getPath() + + " doesn't exist "); + return false; + } + long fileLastModifiedTime = replicaFile.lastModified(); + if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) { + LOG.info("Replica Cache file: " + replicaFile.getPath() + + " has gone stale"); + // Just to make findbugs happy + if (!replicaFile.delete()) { + LOG.info("Replica Cache file: " + replicaFile.getPath() + + " cannot be deleted"); + } + return false; + } + FileInputStream inputStream = null; + try { + inputStream = new FileInputStream(replicaFile); + BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream); + Iterator iterator = blocksList.iterator(); + while (iterator.hasNext()) { + BlockReportReplica replica = iterator.next(); + switch (replica.getState()) { + case FINALIZED: + addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true); + break; + case RUR: + case RBW: + case RWR: + addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false); + break; + default: + break; + } + } + inputStream.close(); + // Now it is safe to add the replica into volumeMap + // In case of any exception during parsing this cache file, fall back + // to scan all the files on disk. + for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) { + volumeMap.add(bpid, info); + } + LOG.info("Successfully read replica from cache file : " + + replicaFile.getPath()); + return true; + } catch (Exception e) { + // Any exception we need to revert back to read from disk + // Log the error and return false + LOG.info("Exception occured while reading the replicas cache file: " + + replicaFile.getPath(), e ); + return false; + } + finally { + if (!replicaFile.delete()) { + LOG.info("Failed to delete replica cache file: " + + replicaFile.getPath()); + } + // close the inputStream + IOUtils.closeStream(inputStream); + } + } + + private void saveReplicas(BlockListAsLongs blocksListToPersist) { + if (blocksListToPersist == null || + blocksListToPersist.getNumberOfBlocks()== 0) { + return; + } + File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp"); + if (tmpFile.exists() && !tmpFile.delete()) { + LOG.warn("Failed to delete tmp replicas file in " + + tmpFile.getPath()); + return; + } + File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE); + if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { + LOG.warn("Failed to delete replicas file in " + + replicaCacheFile.getPath()); + return; + } + + FileOutputStream out = null; + try { + out = new FileOutputStream(tmpFile); + blocksListToPersist.writeTo(out); + out.close(); + // Renaming the tmp file to replicas + Files.move(tmpFile, replicaCacheFile); + } catch (Exception e) { + // If write failed, the volume might be bad. Since the cache file is + // not critical, log the error, delete both the files (tmp and cache) + // and continue. + LOG.warn("Failed to write replicas to cache ", e); + if (replicaCacheFile.exists() && !replicaCacheFile.delete()) { + LOG.warn("Failed to delete replicas file: " + + replicaCacheFile.getPath()); + } + } finally { + IOUtils.closeStream(out); + if (tmpFile.exists() && !tmpFile.delete()) { + LOG.warn("Failed to delete tmp file in " + + tmpFile.getPath()); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 05c48718f70..cf471ca655a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2463,8 +2463,9 @@ public void addBlockPool(String bpid, Configuration conf) @Override public synchronized void shutdownBlockPool(String bpid) { LOG.info("Removing block pool " + bpid); + Map blocksPerVolume = getBlockReports(bpid); volumeMap.cleanUpBlockPool(bpid); - volumes.removeBlockPool(bpid); + volumes.removeBlockPool(bpid, blocksPerVolume); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 23efbdf8bbf..4dbc7f17ee3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; @@ -65,7 +66,6 @@ import org.apache.hadoop.util.Time; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -805,7 +805,7 @@ void shutdown() { } Set> set = bpSlices.entrySet(); for (Entry entry : set) { - entry.getValue().shutdown(); + entry.getValue().shutdown(null); } } @@ -815,10 +815,10 @@ void addBlockPool(String bpid, Configuration conf) throws IOException { bpSlices.put(bpid, bp); } - void shutdownBlockPool(String bpid) { + void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { - bp.shutdown(); + bp.shutdown(blocksListsAsLongs); } bpSlices.remove(bpid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index a5611c5b191..4fddfb9e644 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -35,10 +35,12 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; @@ -428,9 +430,10 @@ public void run() { bpid + ": " + totalTimeTaken + "ms"); } - void removeBlockPool(String bpid) { + void removeBlockPool(String bpid, Map + blocksPerVolume) { for (FsVolumeImpl v : volumes.get()) { - v.shutdownBlockPool(bpid); + v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage())); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java index 2e5348e8009..e9891bfe595 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java @@ -304,10 +304,11 @@ public static void checksumContentsHelper(NodeType nodeType, File dir, continue; } - // skip VERSION and dfsUsed file for DataNodes + // skip VERSION and dfsUsed and replicas file for DataNodes if (nodeType == DATA_NODE && (list[i].getName().equals("VERSION") || - list[i].getName().equals("dfsUsed"))) { + list[i].getName().equals("dfsUsed") || + list[i].getName().equals("replicas"))) { continue; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index f9a2ba139bf..9dee7249952 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -218,4 +218,11 @@ public static void restoreDataDirFromFailure(File... dirs) } } } + + public static void runDirectoryScanner(DataNode dn) throws IOException { + DirectoryScanner directoryScanner = dn.getDirectoryScanner(); + if (directoryScanner != null) { + dn.getDirectoryScanner().reconcile(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index 9325cdcf733..96a73c64a13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -17,14 +17,25 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.io.IOException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; @@ -34,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.junit.Assert; import org.junit.Test; @@ -501,4 +513,144 @@ private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) + "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]); } } + + /** + * This is a test to check the replica map before and after the datanode + * quick restart (less than 5 minutes) + * @throws Exception + */ + @Test + public void testReplicaMapAfterDatanodeRestart() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .build(); + try { + cluster.waitActive(); + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + assertNotNull("cannot create nn1", nn1); + assertNotNull("cannot create nn2", nn2); + + // check number of volumes in fsdataset + DataNode dn = cluster.getDataNodes().get(0); + FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils. + getFSDataset(dn); + ReplicaMap replicaMap = dataSet.volumeMap; + + List volumes = dataSet.getVolumes(); + // number of volumes should be 2 - [data1, data2] + assertEquals("number of volumes is wrong", 2, volumes.size()); + ArrayList bpList = new ArrayList(Arrays.asList( + cluster.getNamesystem(0).getBlockPoolId(), + cluster.getNamesystem(1).getBlockPoolId())); + + Assert.assertTrue("Cluster should have 2 block pools", + bpList.size() == 2); + + createReplicas(bpList, volumes, replicaMap); + ReplicaMap oldReplicaMap = new ReplicaMap(this); + oldReplicaMap.addAll(replicaMap); + + cluster.restartDataNode(0); + cluster.waitActive(); + dn = cluster.getDataNodes().get(0); + dataSet = (FsDatasetImpl) dn.getFSDataset(); + testEqualityOfReplicaMap(oldReplicaMap, dataSet.volumeMap, bpList); + } finally { + cluster.shutdown(); + } + } + + /** + * Compare the replica map before and after the restart + **/ + private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap + newReplicaMap, List bpidList) { + // Traversing through newReplica map and remove the corresponding + // replicaInfo from oldReplicaMap. + for (String bpid: bpidList) { + for (ReplicaInfo info: newReplicaMap.replicas(bpid)) { + assertNotNull("Volume map before restart didn't contain the " + + "blockpool: " + bpid, oldReplicaMap.replicas(bpid)); + + ReplicaInfo oldReplicaInfo = oldReplicaMap.get(bpid, + info.getBlockId()); + // Volume map after restart contains a blockpool id which + assertNotNull("Old Replica Map didnt't contain block with blockId: " + + info.getBlockId(), oldReplicaInfo); + + ReplicaState oldState = oldReplicaInfo.getState(); + // Since after restart, all the RWR, RBW and RUR blocks gets + // converted to RWR + if (info.getState() == ReplicaState.RWR) { + if (oldState == ReplicaState.RWR || oldState == ReplicaState.RBW + || oldState == ReplicaState.RUR) { + oldReplicaMap.remove(bpid, oldReplicaInfo); + } + } else if (info.getState() == ReplicaState.FINALIZED && + oldState == ReplicaState.FINALIZED) { + oldReplicaMap.remove(bpid, oldReplicaInfo); + } + } + } + + // We don't persist the ReplicaInPipeline replica + // and if the old replica map contains any replica except ReplicaInPipeline + // then we didn't persist that replica + for (String bpid: bpidList) { + for (ReplicaInfo replicaInfo: oldReplicaMap.replicas(bpid)) { + if (replicaInfo.getState() != ReplicaState.TEMPORARY) { + Assert.fail("After datanode restart we lost the block with blockId: " + + replicaInfo.getBlockId()); + } + } + } + } + + private void createReplicas(List bpList, List volumes, + ReplicaMap volumeMap) throws IOException { + Assert.assertTrue("Volume map can't be null" , volumeMap != null); + + // Here we create all different type of replicas and add it + // to volume map. + // Created all type of ReplicaInfo, each under Blkpool corresponding volume + long id = 1; // This variable is used as both blockId and genStamp + for (String bpId: bpList) { + for (FsVolumeImpl volume: volumes) { + ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume, + DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id)); + volumeMap.add(bpId, finalizedReplica); + id++; + + ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume, + volume.getRbwDir(bpId), null, 100); + volumeMap.add(bpId, rbwReplica); + id++; + + ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id, + volume, volume.getRbwDir(bpId)); + volumeMap.add(bpId, rwrReplica); + id++; + + ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume, + volume.getTmpDir(bpId), 0); + volumeMap.add(bpId, ripReplica); + id++; + } + } + + for (String bpId: bpList) { + for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) { + File parentFile = replicaInfo.getBlockFile().getParentFile(); + if (!parentFile.exists()) { + if (!parentFile.mkdirs()) { + throw new IOException("Failed to mkdirs " + parentFile); + } + } + replicaInfo.getBlockFile().createNewFile(); + replicaInfo.getMetaFile().createNewFile(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 92ea111495e..5d319b43a32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.util.StringUtils; import org.junit.Test; import org.slf4j.Logger; @@ -483,6 +485,10 @@ public void testMaxCorruptFiles() throws Exception { } } + // Run the direcrtoryScanner to update the Datanodes volumeMap + DataNode dn = cluster.getDataNodes().get(0); + DataNodeTestUtils.runDirectoryScanner(dn); + // Occasionally the BlockPoolSliceScanner can run before we have removed // the blocks. Restart the Datanode to trigger the scanner into running // once more. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java index 168ebb988ac..37abc5b726d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Test; public class TestProcessCorruptBlocks { @@ -269,6 +270,8 @@ private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path file // But the datadirectory will not change assertTrue(cluster.corruptReplica(dnIndex, block)); + // Run directory scanner to update the DN's volume map + DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0)); DataNodeProperties dnProps = cluster.stopDataNode(0); // Each datanode has multiple data dirs, check each diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java index 4d4fed63aa6..443500cdc26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.util.ThreadUtil; import org.junit.Test; @@ -69,6 +70,8 @@ public void testChangedStorageId() throws IOException, URISyntaxException, ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); assertTrue(cluster.changeGenStampOfBlock(0, block, 900)); + // Run directory dsscanner to update Datanode's volumeMap + DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0)); // Stop the DN so the replica with the changed gen stamp will be reported // when this DN starts up. DataNodeProperties dnProps = cluster.stopDataNode(0);