From 8fb3e34e740a6259f0f36bf1ae9d812470e09817 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 13 May 2014 18:41:07 +0000 Subject: [PATCH] HDFS-6186. Merge change r1594314 from trunk. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1594324 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../server/blockmanagement/BlockManager.java | 6 +- .../blockmanagement/InvalidateBlocks.java | 49 +++++- .../TestPendingInvalidateBlock.java | 162 ++++++++++++++++++ 5 files changed, 219 insertions(+), 4 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d8873b42b35..32ca87bf8a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -100,6 +100,8 @@ Release 2.5.0 - UNRELEASED HDFS-6230. Expose upgrade status through NameNode web UI. (Mit Desai via wheat9) + HDFS-6186. Pause deletion of blocks when the namenode starts up. (jing9) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8d63c429455..72c2611d90d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -245,6 +245,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.path.based.cache.refresh.interval.ms"; public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L; + /** Pending period of block deletion since NameNode startup */ + public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY = "dfs.namenode.startup.delay.block.deletion.ms"; + public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT = 0L; + // Whether to enable datanode's stale state detection and usage for reads public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode"; public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 0b4f68d7b7c..6ba2969e3d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -261,7 +261,11 @@ public class BlockManager { this.namesystem = namesystem; datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); - invalidateBlocks = new InvalidateBlocks(datanodeManager); + + final long pendingPeriod = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT); + invalidateBlocks = new InvalidateBlocks(datanodeManager, pendingPeriod); // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java index f0d5d088dd1..bba9678be1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java @@ -18,18 +18,24 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.PrintWriter; +import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; +import java.util.Calendar; +import java.util.GregorianCalendar; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.util.Time; + +import com.google.common.annotations.VisibleForTesting; /** * Keeps a Collection for every named machine containing blocks @@ -46,8 +52,28 @@ class InvalidateBlocks { private final DatanodeManager datanodeManager; - InvalidateBlocks(final DatanodeManager datanodeManager) { + /** + * The period of pending time for block invalidation since the NameNode + * startup + */ + private final long pendingPeriodInMs; + /** the startup time */ + private final long startupTime = Time.monotonicNow(); + + InvalidateBlocks(final DatanodeManager datanodeManager, long pendingPeriodInMs) { this.datanodeManager = datanodeManager; + this.pendingPeriodInMs = pendingPeriodInMs; + printBlockDeletionTime(BlockManager.LOG); + } + + private void printBlockDeletionTime(final Log log) { + log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY + + " is set to " + pendingPeriodInMs + " ms."); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss"); + Calendar calendar = new GregorianCalendar(); + calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000)); + log.info("The block deletion will start around " + + sdf.format(calendar.getTime())); } /** @return the number of blocks to be invalidated . */ @@ -136,8 +162,25 @@ class InvalidateBlocks { return new ArrayList(node2blocks.keySet()); } + /** + * @return the remianing pending time + */ + @VisibleForTesting + long getInvalidationDelay() { + return pendingPeriodInMs - (Time.monotonicNow() - startupTime); + } + /** Invalidate work for the storage. */ int invalidateWork(final String storageId) { + final long delay = getInvalidationDelay(); + if (delay > 0) { + if (BlockManager.LOG.isDebugEnabled()) { + BlockManager.LOG + .debug("Block deletion is delayed during NameNode startup. " + + "The deletion will start after " + delay + " ms."); + } + return 0; + } final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId); if (dn == null) { remove(storageId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java new file mode 100644 index 00000000000..35e27b0878f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; + +/** + * Test if we can correctly delay the deletion of blocks. + */ +public class TestPendingInvalidateBlock { + { + ((Log4JLogger)BlockManager.LOG).getLogger().setLevel(Level.DEBUG); + } + + private static final int BLOCKSIZE = 1024; + private static final short REPLICATION = 2; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + // block deletion pending period + conf.setLong(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY, 1000 * 5); + // set the block report interval to 2s + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 2000); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + // disable the RPC timeout for debug + conf.setLong(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 0); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION) + .build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testPendingDeletion() throws Exception { + final Path foo = new Path("/foo"); + DFSTestUtil.createFile(dfs, foo, BLOCKSIZE, REPLICATION, 0); + // restart NN + cluster.restartNameNode(true); + dfs.delete(foo, true); + Assert.assertEquals(0, cluster.getNamesystem().getBlocksTotal()); + Assert.assertEquals(REPLICATION, cluster.getNamesystem() + .getPendingDeletionBlocks()); + Thread.sleep(6000); + Assert.assertEquals(0, cluster.getNamesystem().getBlocksTotal()); + Assert.assertEquals(0, cluster.getNamesystem().getPendingDeletionBlocks()); + } + + /** + * Test whether we can delay the deletion of unknown blocks in DataNode's + * first several block reports. + */ + @Test + public void testPendingDeleteUnknownBlocks() throws Exception { + final int fileNum = 5; // 5 files + final Path[] files = new Path[fileNum]; + final DataNodeProperties[] dnprops = new DataNodeProperties[REPLICATION]; + // create a group of files, each file contains 1 block + for (int i = 0; i < fileNum; i++) { + files[i] = new Path("/file" + i); + DFSTestUtil.createFile(dfs, files[i], BLOCKSIZE, REPLICATION, i); + } + // wait until all DataNodes have replicas + waitForReplication(); + for (int i = REPLICATION - 1; i >= 0; i--) { + dnprops[i] = cluster.stopDataNode(i); + } + Thread.sleep(2000); + // delete 2 files, we still have 3 files remaining so that we can cover + // every DN storage + for (int i = 0; i < 2; i++) { + dfs.delete(files[i], true); + } + + // restart NameNode + cluster.restartNameNode(false); + InvalidateBlocks invalidateBlocks = (InvalidateBlocks) Whitebox + .getInternalState(cluster.getNamesystem().getBlockManager(), + "invalidateBlocks"); + InvalidateBlocks mockIb = Mockito.spy(invalidateBlocks); + Mockito.doReturn(1L).when(mockIb).getInvalidationDelay(); + Whitebox.setInternalState(cluster.getNamesystem().getBlockManager(), + "invalidateBlocks", mockIb); + + Assert.assertEquals(0L, cluster.getNamesystem().getPendingDeletionBlocks()); + // restart DataNodes + for (int i = 0; i < REPLICATION; i++) { + cluster.restartDataNode(dnprops[i], true); + } + cluster.waitActive(); + + for (int i = 0; i < REPLICATION; i++) { + DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(i)); + } + Thread.sleep(2000); + // make sure we have received block reports by checking the total block # + Assert.assertEquals(3, cluster.getNamesystem().getBlocksTotal()); + Assert.assertEquals(4, cluster.getNamesystem().getPendingDeletionBlocks()); + + cluster.restartNameNode(true); + Thread.sleep(6000); + Assert.assertEquals(3, cluster.getNamesystem().getBlocksTotal()); + Assert.assertEquals(0, cluster.getNamesystem().getPendingDeletionBlocks()); + } + + private long waitForReplication() throws Exception { + for (int i = 0; i < 10; i++) { + long ur = cluster.getNamesystem().getUnderReplicatedBlocks(); + if (ur == 0) { + return 0; + } else { + Thread.sleep(1000); + } + } + return cluster.getNamesystem().getUnderReplicatedBlocks(); + } + +}