HDFS-6186. Merge change r1594314 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1594324 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f8dad8a751
commit
8fb3e34e74
|
@ -100,6 +100,8 @@ Release 2.5.0 - UNRELEASED
|
||||||
HDFS-6230. Expose upgrade status through NameNode web UI.
|
HDFS-6230. Expose upgrade status through NameNode web UI.
|
||||||
(Mit Desai via wheat9)
|
(Mit Desai via wheat9)
|
||||||
|
|
||||||
|
HDFS-6186. Pause deletion of blocks when the namenode starts up. (jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||||
|
|
|
@ -245,6 +245,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.namenode.path.based.cache.refresh.interval.ms";
|
"dfs.namenode.path.based.cache.refresh.interval.ms";
|
||||||
public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L;
|
public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L;
|
||||||
|
|
||||||
|
/** Pending period of block deletion since NameNode startup */
|
||||||
|
public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY = "dfs.namenode.startup.delay.block.deletion.ms";
|
||||||
|
public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT = 0L;
|
||||||
|
|
||||||
// Whether to enable datanode's stale state detection and usage for reads
|
// Whether to enable datanode's stale state detection and usage for reads
|
||||||
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
|
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
|
||||||
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
|
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
|
||||||
|
|
|
@ -261,7 +261,11 @@ public class BlockManager {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||||
invalidateBlocks = new InvalidateBlocks(datanodeManager);
|
|
||||||
|
final long pendingPeriod = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT);
|
||||||
|
invalidateBlocks = new InvalidateBlocks(datanodeManager, pendingPeriod);
|
||||||
|
|
||||||
// Compute the map capacity by allocating 2% of total memory
|
// Compute the map capacity by allocating 2% of total memory
|
||||||
blocksMap = new BlocksMap(
|
blocksMap = new BlocksMap(
|
||||||
|
|
|
@ -18,18 +18,24 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.Calendar;
|
||||||
import java.util.Iterator;
|
import java.util.GregorianCalendar;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keeps a Collection for every named machine containing blocks
|
* Keeps a Collection for every named machine containing blocks
|
||||||
|
@ -46,8 +52,28 @@ class InvalidateBlocks {
|
||||||
|
|
||||||
private final DatanodeManager datanodeManager;
|
private final DatanodeManager datanodeManager;
|
||||||
|
|
||||||
InvalidateBlocks(final DatanodeManager datanodeManager) {
|
/**
|
||||||
|
* The period of pending time for block invalidation since the NameNode
|
||||||
|
* startup
|
||||||
|
*/
|
||||||
|
private final long pendingPeriodInMs;
|
||||||
|
/** the startup time */
|
||||||
|
private final long startupTime = Time.monotonicNow();
|
||||||
|
|
||||||
|
InvalidateBlocks(final DatanodeManager datanodeManager, long pendingPeriodInMs) {
|
||||||
this.datanodeManager = datanodeManager;
|
this.datanodeManager = datanodeManager;
|
||||||
|
this.pendingPeriodInMs = pendingPeriodInMs;
|
||||||
|
printBlockDeletionTime(BlockManager.LOG);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printBlockDeletionTime(final Log log) {
|
||||||
|
log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY
|
||||||
|
+ " is set to " + pendingPeriodInMs + " ms.");
|
||||||
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss");
|
||||||
|
Calendar calendar = new GregorianCalendar();
|
||||||
|
calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000));
|
||||||
|
log.info("The block deletion will start around "
|
||||||
|
+ sdf.format(calendar.getTime()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return the number of blocks to be invalidated . */
|
/** @return the number of blocks to be invalidated . */
|
||||||
|
@ -136,8 +162,25 @@ class InvalidateBlocks {
|
||||||
return new ArrayList<String>(node2blocks.keySet());
|
return new ArrayList<String>(node2blocks.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the remianing pending time
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
long getInvalidationDelay() {
|
||||||
|
return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
|
||||||
|
}
|
||||||
|
|
||||||
/** Invalidate work for the storage. */
|
/** Invalidate work for the storage. */
|
||||||
int invalidateWork(final String storageId) {
|
int invalidateWork(final String storageId) {
|
||||||
|
final long delay = getInvalidationDelay();
|
||||||
|
if (delay > 0) {
|
||||||
|
if (BlockManager.LOG.isDebugEnabled()) {
|
||||||
|
BlockManager.LOG
|
||||||
|
.debug("Block deletion is delayed during NameNode startup. "
|
||||||
|
+ "The deletion will start after " + delay + " ms.");
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
|
final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
|
||||||
if (dn == null) {
|
if (dn == null) {
|
||||||
remove(storageId);
|
remove(storageId);
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if we can correctly delay the deletion of blocks.
|
||||||
|
*/
|
||||||
|
public class TestPendingInvalidateBlock {
|
||||||
|
{
|
||||||
|
((Log4JLogger)BlockManager.LOG).getLogger().setLevel(Level.DEBUG);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final int BLOCKSIZE = 1024;
|
||||||
|
private static final short REPLICATION = 2;
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private DistributedFileSystem dfs;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
|
||||||
|
// block deletion pending period
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY, 1000 * 5);
|
||||||
|
// set the block report interval to 2s
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 2000);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||||
|
// disable the RPC timeout for debug
|
||||||
|
conf.setLong(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 0);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
dfs = cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPendingDeletion() throws Exception {
|
||||||
|
final Path foo = new Path("/foo");
|
||||||
|
DFSTestUtil.createFile(dfs, foo, BLOCKSIZE, REPLICATION, 0);
|
||||||
|
// restart NN
|
||||||
|
cluster.restartNameNode(true);
|
||||||
|
dfs.delete(foo, true);
|
||||||
|
Assert.assertEquals(0, cluster.getNamesystem().getBlocksTotal());
|
||||||
|
Assert.assertEquals(REPLICATION, cluster.getNamesystem()
|
||||||
|
.getPendingDeletionBlocks());
|
||||||
|
Thread.sleep(6000);
|
||||||
|
Assert.assertEquals(0, cluster.getNamesystem().getBlocksTotal());
|
||||||
|
Assert.assertEquals(0, cluster.getNamesystem().getPendingDeletionBlocks());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether we can delay the deletion of unknown blocks in DataNode's
|
||||||
|
* first several block reports.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPendingDeleteUnknownBlocks() throws Exception {
|
||||||
|
final int fileNum = 5; // 5 files
|
||||||
|
final Path[] files = new Path[fileNum];
|
||||||
|
final DataNodeProperties[] dnprops = new DataNodeProperties[REPLICATION];
|
||||||
|
// create a group of files, each file contains 1 block
|
||||||
|
for (int i = 0; i < fileNum; i++) {
|
||||||
|
files[i] = new Path("/file" + i);
|
||||||
|
DFSTestUtil.createFile(dfs, files[i], BLOCKSIZE, REPLICATION, i);
|
||||||
|
}
|
||||||
|
// wait until all DataNodes have replicas
|
||||||
|
waitForReplication();
|
||||||
|
for (int i = REPLICATION - 1; i >= 0; i--) {
|
||||||
|
dnprops[i] = cluster.stopDataNode(i);
|
||||||
|
}
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// delete 2 files, we still have 3 files remaining so that we can cover
|
||||||
|
// every DN storage
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
dfs.delete(files[i], true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// restart NameNode
|
||||||
|
cluster.restartNameNode(false);
|
||||||
|
InvalidateBlocks invalidateBlocks = (InvalidateBlocks) Whitebox
|
||||||
|
.getInternalState(cluster.getNamesystem().getBlockManager(),
|
||||||
|
"invalidateBlocks");
|
||||||
|
InvalidateBlocks mockIb = Mockito.spy(invalidateBlocks);
|
||||||
|
Mockito.doReturn(1L).when(mockIb).getInvalidationDelay();
|
||||||
|
Whitebox.setInternalState(cluster.getNamesystem().getBlockManager(),
|
||||||
|
"invalidateBlocks", mockIb);
|
||||||
|
|
||||||
|
Assert.assertEquals(0L, cluster.getNamesystem().getPendingDeletionBlocks());
|
||||||
|
// restart DataNodes
|
||||||
|
for (int i = 0; i < REPLICATION; i++) {
|
||||||
|
cluster.restartDataNode(dnprops[i], true);
|
||||||
|
}
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
for (int i = 0; i < REPLICATION; i++) {
|
||||||
|
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(i));
|
||||||
|
}
|
||||||
|
Thread.sleep(2000);
|
||||||
|
// make sure we have received block reports by checking the total block #
|
||||||
|
Assert.assertEquals(3, cluster.getNamesystem().getBlocksTotal());
|
||||||
|
Assert.assertEquals(4, cluster.getNamesystem().getPendingDeletionBlocks());
|
||||||
|
|
||||||
|
cluster.restartNameNode(true);
|
||||||
|
Thread.sleep(6000);
|
||||||
|
Assert.assertEquals(3, cluster.getNamesystem().getBlocksTotal());
|
||||||
|
Assert.assertEquals(0, cluster.getNamesystem().getPendingDeletionBlocks());
|
||||||
|
}
|
||||||
|
|
||||||
|
private long waitForReplication() throws Exception {
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
long ur = cluster.getNamesystem().getUnderReplicatedBlocks();
|
||||||
|
if (ur == 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cluster.getNamesystem().getUnderReplicatedBlocks();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue