diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 13a17b870f5..88c96315c68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -238,6 +238,13 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3559. DFSTestUtil: use Builder class to construct DFSTestUtil instances. (Colin Patrick McCabe via atm) + HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection. + (szetszwo) + + HDFS-3157. Fix a bug in the case that the generation stamps of the stored + block in a namenode and the reported block from a datanode do not match. + (Ashish Singhi via szetszwo) + BREAKDOWN OF HDFS-3042 SUBTASKS HDFS-2185. HDFS portion of ZK-based FailoverController (todd) @@ -256,9 +263,6 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3428. Move DelegationTokenRenewer to common (tucu) - HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection. - (szetszwo) - HDFS-3491. HttpFs does not set permissions correctly (tucu) HDFS-3580. incompatible types; no instance(s) of type variable(s) V exist diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a1ee49ea84e..068537bd3d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -930,78 +930,71 @@ public class BlockManager { + blk + " not found."); return; } - markBlockAsCorrupt(storedBlock, dn, reason); + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn); } - private void markBlockAsCorrupt(BlockInfo storedBlock, - DatanodeInfo dn, - String reason) throws IOException { - assert storedBlock != null : "storedBlock should not be null"; + private void markBlockAsCorrupt(BlockToMarkCorrupt b, + DatanodeInfo dn) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { - throw new IOException("Cannot mark block " + - storedBlock.getBlockName() + - " as corrupt because datanode " + dn + - " does not exist. "); + throw new IOException("Cannot mark " + b + + " as corrupt because datanode " + dn + " does not exist"); } - BlockCollection bc = storedBlock.getBlockCollection(); + BlockCollection bc = b.corrupted.getBlockCollection(); if (bc == null) { - NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " + - "block " + storedBlock + - " could not be marked as corrupt as it" + - " does not belong to any file"); - addToInvalidates(storedBlock, node); + NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " + b + + " cannot be marked as corrupt as it does not belong to any file"); + addToInvalidates(b.corrupted, node); return; } // Add replica to the data-node if it is not already there - node.addBlock(storedBlock); + node.addBlock(b.stored); // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason); - if (countNodes(storedBlock).liveReplicas() >= bc.getReplication()) { + corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason); + if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) { // the block is over-replicated so invalidate the replicas immediately - invalidateBlock(storedBlock, node); + invalidateBlock(b, node); } else if (namesystem.isPopulatingReplQueues()) { // add the block to neededReplication - updateNeededReplications(storedBlock, -1, 0); + updateNeededReplications(b.stored, -1, 0); } } /** * Invalidates the given block on the given datanode. */ - private void invalidateBlock(Block blk, DatanodeInfo dn) - throws IOException { - NameNode.stateChangeLog.info("BLOCK* invalidateBlock: " - + blk + " on " + dn); + private void invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn + ) throws IOException { + NameNode.stateChangeLog.info("BLOCK* invalidateBlock: " + b + " on " + dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { - throw new IOException("Cannot invalidate block " + blk + throw new IOException("Cannot invalidate " + b + " because datanode " + dn + " does not exist."); } // Check how many copies we have of the block - NumberReplicas nr = countNodes(blk); + NumberReplicas nr = countNodes(b.stored); if (nr.replicasOnStaleNodes() > 0) { NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " + - "invalidation of block " + blk + " on " + dn + " because " + + "invalidation of " + b + " on " + dn + " because " + nr.replicasOnStaleNodes() + " replica(s) are located on nodes " + "with potentially out-of-date block reports."); - postponeBlock(blk); + postponeBlock(b.corrupted); } else if (nr.liveReplicas() >= 1) { // If we have at least one copy on a live node, then we can delete it. - addToInvalidates(blk, dn); - removeStoredBlock(blk, node); + addToInvalidates(b.corrupted, dn); + removeStoredBlock(b.stored, node); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: " - + blk + " on " + dn + " listed for deletion."); + + b + " on " + dn + " listed for deletion."); } } else { - NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on " - + dn + " is the only copy and was not deleted."); + NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + b + + " on " + dn + " is the only copy and was not deleted."); } } @@ -1408,14 +1401,37 @@ public class BlockManager { * list of blocks that should be considered corrupt due to a block report. */ private static class BlockToMarkCorrupt { - final BlockInfo blockInfo; + /** The corrupted block in a datanode. */ + final BlockInfo corrupted; + /** The corresponding block stored in the BlockManager. */ + final BlockInfo stored; + /** The reason to mark corrupt. */ final String reason; - BlockToMarkCorrupt(BlockInfo blockInfo, String reason) { - super(); - this.blockInfo = blockInfo; + BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) { + Preconditions.checkNotNull(corrupted, "corrupted is null"); + Preconditions.checkNotNull(stored, "stored is null"); + + this.corrupted = corrupted; + this.stored = stored; this.reason = reason; } + + BlockToMarkCorrupt(BlockInfo stored, String reason) { + this(stored, stored, reason); + } + + BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) { + this(new BlockInfo(stored), stored, reason); + //the corrupted block in datanode has a different generation stamp + corrupted.setGenerationStamp(gs); + } + + @Override + public String toString() { + return corrupted + "(" + + (corrupted == stored? "same as stored": "stored=" + stored) + ")"; + } } /** @@ -1536,7 +1552,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b.blockInfo, node, b.reason); + markBlockAsCorrupt(b, node); } } @@ -1586,7 +1602,7 @@ public class BlockManager { queueReportedBlock(node, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c.blockInfo, node, c.reason); + markBlockAsCorrupt(c, node); } continue; } @@ -1807,7 +1823,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block assert pendingDNMessages.count() == 0; } - /* + /** * The next two methods test the various cases under which we must conclude * the replica is corrupt, or under construction. These are laid out * as switch statements, on the theory that it is easier to understand @@ -1817,7 +1833,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block * @return a BlockToMarkCorrupt object, or null if the replica is not corrupt */ private BlockToMarkCorrupt checkReplicaCorrupt( - Block iblk, ReplicaState reportedState, + Block reported, ReplicaState reportedState, BlockInfo storedBlock, BlockUCState ucState, DatanodeDescriptor dn) { switch(reportedState) { @@ -1825,15 +1841,16 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block switch(ucState) { case COMPLETE: case COMMITTED: - if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) { - return new BlockToMarkCorrupt(storedBlock, - "block is " + ucState + " and reported genstamp " + - iblk.getGenerationStamp() + " does not match " + - "genstamp in block map " + storedBlock.getGenerationStamp()); - } else if (storedBlock.getNumBytes() != iblk.getNumBytes()) { + if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { + final long reportedGS = reported.getGenerationStamp(); + return new BlockToMarkCorrupt(storedBlock, reportedGS, + "block is " + ucState + " and reported genstamp " + reportedGS + + " does not match genstamp in block map " + + storedBlock.getGenerationStamp()); + } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { return new BlockToMarkCorrupt(storedBlock, "block is " + ucState + " and reported length " + - iblk.getNumBytes() + " does not match " + + reported.getNumBytes() + " does not match " + "length in block map " + storedBlock.getNumBytes()); } else { return null; // not corrupt @@ -1845,11 +1862,12 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block case RWR: if (!storedBlock.isComplete()) { return null; // not corrupt - } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) { - return new BlockToMarkCorrupt(storedBlock, - "reported " + reportedState + " replica with genstamp " + - iblk.getGenerationStamp() + " does not match COMPLETE block's " + - "genstamp in block map " + storedBlock.getGenerationStamp()); + } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { + final long reportedGS = reported.getGenerationStamp(); + return new BlockToMarkCorrupt(storedBlock, reportedGS, + "reported " + reportedState + " replica with genstamp " + reportedGS + + " does not match COMPLETE block's genstamp in block map " + + storedBlock.getGenerationStamp()); } else { // COMPLETE block, same genstamp if (reportedState == ReplicaState.RBW) { // If it's a RBW report for a COMPLETE block, it may just be that @@ -1871,8 +1889,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block String msg = "Unexpected replica state " + reportedState + " for block: " + storedBlock + " on " + dn + " size " + storedBlock.getNumBytes(); - // log here at WARN level since this is really a broken HDFS - // invariant + // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); return new BlockToMarkCorrupt(storedBlock, msg); } @@ -2075,7 +2092,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(Block blk) { + private void invalidateCorruptReplicas(BlockInfo blk) { Collection nodes = corruptReplicas.getNodes(blk); boolean gotException = false; if (nodes == null) @@ -2085,7 +2102,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); for (DatanodeDescriptor node : nodesCopy) { try { - invalidateBlock(blk, node); + invalidateBlock(new BlockToMarkCorrupt(blk, null), node); } catch (IOException e) { NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " + "error in deleting bad block " + blk + @@ -2501,7 +2518,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b.blockInfo, node, b.reason); + markBlockAsCorrupt(b, node); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java new file mode 100644 index 00000000000..eacd09b374a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test when RBW block is removed. Invalidation of the corrupted block happens + * and then the under replicated block gets replicated to the datanode. + */ +public class TestRBWBlockInvalidation { + private static NumberReplicas countReplicas(final FSNamesystem namesystem, + ExtendedBlock block) { + return namesystem.getBlockManager().countNodes(block.getLocalBlock()); + } + + /** + * Test when a block's replica is removed from RBW folder in one of the + * datanode, namenode should ask to invalidate that corrupted block and + * schedule replication for one more replica for that under replicated block. + */ + @Test + public void testBlockInvalidationWhenRBWReplicaMissedInDN() + throws IOException, InterruptedException { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 300); + conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); + FSDataOutputStream out = null; + try { + final FSNamesystem namesystem = cluster.getNamesystem(); + FileSystem fs = cluster.getFileSystem(); + Path testPath = new Path(MiniDFSCluster.getBaseDirectory(), "foo1"); + out = fs.create(testPath, (short) 2); + out.writeBytes("HDFS-3157: " + testPath); + out.hsync(); + cluster.startDataNodes(conf, 1, true, null, null, null); + String bpid = namesystem.getBlockPoolId(); + ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, testPath); + Block block = blk.getLocalBlock(); + DataNode dn = cluster.getDataNodes().get(0); + + // Delete partial block and its meta information from the RBW folder + // of first datanode. + File blockFile = DataNodeTestUtils.getBlockFile(dn, bpid, block); + File metaFile = DataNodeTestUtils.getMetaFile(dn, bpid, block); + assertTrue("Could not delete the block file from the RBW folder", + blockFile.delete()); + assertTrue("Could not delete the block meta file from the RBW folder", + metaFile.delete()); + + out.close(); + + // Check datanode has reported the corrupt block. + boolean isCorruptReported = false; + while (!isCorruptReported) { + if (countReplicas(namesystem, blk).corruptReplicas() > 0) { + isCorruptReported = true; + } + Thread.sleep(100); + } + assertEquals("There should be 1 replica in the corruptReplicasMap", 1, + countReplicas(namesystem, blk).corruptReplicas()); + + // Check the block has got replicated to another datanode. + blk = DFSTestUtil.getFirstBlock(fs, testPath); + boolean isReplicated = false; + while (!isReplicated) { + if (countReplicas(namesystem, blk).liveReplicas() > 1) { + isReplicated = true; + } + Thread.sleep(100); + } + assertEquals("There should be two live replicas", 2, countReplicas( + namesystem, blk).liveReplicas()); + + // sleep for 1 second, so that by this time datanode reports the corrupt + // block after a live replica of block got replicated. + Thread.sleep(1000); + + // Check that there is no corrupt block in the corruptReplicasMap. + assertEquals("There should not be any replica in the corruptReplicasMap", + 0, countReplicas(namesystem, blk).corruptReplicas()); + } finally { + if (out != null) { + out.close(); + } + cluster.shutdown(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 74be37d986b..2f9ed12099e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -137,6 +137,11 @@ public class DataNodeTestUtils { return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b); } + public static File getMetaFile(DataNode dn, String bpid, Block b) + throws IOException { + return FsDatasetTestUtil.getMetaFile(dn.getFSDataset(), bpid, b); + } + public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks ) throws IOException { return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java index 211737fa73e..6bd36edd52a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java @@ -37,6 +37,12 @@ public class FsDatasetTestUtil { return ((FsDatasetImpl)fsd).getBlockFile(bpid, b); } + public static File getMetaFile(FsDatasetSpi fsd, String bpid, Block b) + throws IOException { + return FsDatasetUtil.getMetaFile(getBlockFile(fsd, bpid, b), b + .getGenerationStamp()); + } + public static boolean unlinkBlock(FsDatasetSpi fsd, ExtendedBlock block, int numLinks) throws IOException { final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);