HDFS-3368. Missing blocks due to bad DataNodes coming up and down.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1342512 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
458be39423
commit
2e0a15c488
|
@ -146,6 +146,8 @@ Trunk (unreleased changes)
|
||||||
HDFS-3163. TestHDFSCLI.testAll fails if the user name is not all lowercase.
|
HDFS-3163. TestHDFSCLI.testAll fails if the user name is not all lowercase.
|
||||||
(Brandon Li via atm)
|
(Brandon Li via atm)
|
||||||
|
|
||||||
|
HDFS-3368. Missing blocks due to bad DataNodes coming up and down. (shv)
|
||||||
|
|
||||||
Release 2.0.1-alpha - UNRELEASED
|
Release 2.0.1-alpha - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -107,6 +107,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
|
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
|
||||||
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
|
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
|
||||||
public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
|
public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
|
||||||
|
public static final String DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
|
||||||
|
public static final int DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4;
|
||||||
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
|
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
|
||||||
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
|
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
|
||||||
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
|
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -25,6 +27,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -49,13 +52,19 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
|
private static final String enableDebugLogging =
|
||||||
|
"For more information, please enable DEBUG log level on "
|
||||||
|
+ ((Log4JLogger)LOG).getLogger().getName();
|
||||||
|
|
||||||
private boolean considerLoad;
|
private boolean considerLoad;
|
||||||
private boolean preferLocalNode = true;
|
private boolean preferLocalNode = true;
|
||||||
private NetworkTopology clusterMap;
|
private NetworkTopology clusterMap;
|
||||||
private FSClusterStats stats;
|
private FSClusterStats stats;
|
||||||
static final String enableDebugLogging = "For more information, please enable"
|
private long heartbeatInterval; // interval for DataNode heartbeats
|
||||||
+ " DEBUG level logging on the "
|
/**
|
||||||
+ "org.apache.hadoop.hdfs.server.namenode.FSNamesystem logger.";
|
* A miss of that many heartbeats is tolerated for replica deletion policy.
|
||||||
|
*/
|
||||||
|
private int tolerateHeartbeatMultiplier;
|
||||||
|
|
||||||
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
|
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
|
||||||
NetworkTopology clusterMap) {
|
NetworkTopology clusterMap) {
|
||||||
|
@ -71,6 +80,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
|
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
|
||||||
this.stats = stats;
|
this.stats = stats;
|
||||||
this.clusterMap = clusterMap;
|
this.clusterMap = clusterMap;
|
||||||
|
this.heartbeatInterval = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
||||||
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
|
||||||
|
this.tolerateHeartbeatMultiplier = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ThreadLocal<StringBuilder> threadLocalBuilder =
|
private ThreadLocal<StringBuilder> threadLocalBuilder =
|
||||||
|
@ -551,24 +566,33 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
short replicationFactor,
|
short replicationFactor,
|
||||||
Collection<DatanodeDescriptor> first,
|
Collection<DatanodeDescriptor> first,
|
||||||
Collection<DatanodeDescriptor> second) {
|
Collection<DatanodeDescriptor> second) {
|
||||||
|
long oldestHeartbeat =
|
||||||
|
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||||
|
DatanodeDescriptor oldestHeartbeatNode = null;
|
||||||
long minSpace = Long.MAX_VALUE;
|
long minSpace = Long.MAX_VALUE;
|
||||||
DatanodeDescriptor cur = null;
|
DatanodeDescriptor minSpaceNode = null;
|
||||||
|
|
||||||
// pick replica from the first Set. If first is empty, then pick replicas
|
// pick replica from the first Set. If first is empty, then pick replicas
|
||||||
// from second set.
|
// from second set.
|
||||||
Iterator<DatanodeDescriptor> iter =
|
Iterator<DatanodeDescriptor> iter =
|
||||||
first.isEmpty() ? second.iterator() : first.iterator();
|
first.isEmpty() ? second.iterator() : first.iterator();
|
||||||
|
|
||||||
// pick node with least free space
|
// Pick the node with the oldest heartbeat or with the least free space,
|
||||||
|
// if all hearbeats are within the tolerable heartbeat interval
|
||||||
while (iter.hasNext() ) {
|
while (iter.hasNext() ) {
|
||||||
DatanodeDescriptor node = iter.next();
|
DatanodeDescriptor node = iter.next();
|
||||||
long free = node.getRemaining();
|
long free = node.getRemaining();
|
||||||
|
long lastHeartbeat = node.getLastUpdate();
|
||||||
|
if(lastHeartbeat < oldestHeartbeat) {
|
||||||
|
oldestHeartbeat = lastHeartbeat;
|
||||||
|
oldestHeartbeatNode = node;
|
||||||
|
}
|
||||||
if (minSpace > free) {
|
if (minSpace > free) {
|
||||||
minSpace = free;
|
minSpace = free;
|
||||||
cur = node;
|
minSpaceNode = node;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cur;
|
return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -17,12 +17,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -32,11 +35,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||||
import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
|
import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestOverReplicatedBlocks {
|
public class TestOverReplicatedBlocks {
|
||||||
|
@ -116,6 +122,77 @@ public class TestOverReplicatedBlocks {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static final long SMALL_BLOCK_SIZE =
|
||||||
|
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||||
|
static final long SMALL_FILE_LENGTH = SMALL_BLOCK_SIZE * 4;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The test verifies that replica for deletion is chosen on a node,
|
||||||
|
* with the oldest heartbeat, when this heartbeat is larger than the
|
||||||
|
* tolerable heartbeat interval.
|
||||||
|
* It creates a file with several blocks and replication 4.
|
||||||
|
* The last DN is configured to send heartbeats rarely.
|
||||||
|
*
|
||||||
|
* Test waits until the tolerable heartbeat interval expires, and reduces
|
||||||
|
* replication of the file. All replica deletions should be scheduled for the
|
||||||
|
* last node. No replicas will actually be deleted, since last DN doesn't
|
||||||
|
* send heartbeats.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseReplicaToDelete() throws IOException {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
FileSystem fs = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, SMALL_BLOCK_SIZE);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 300);
|
||||||
|
cluster.startDataNodes(conf, 1, true, null, null, null);
|
||||||
|
DataNode lastDN = cluster.getDataNodes().get(3);
|
||||||
|
DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
|
||||||
|
lastDN, namesystem.getBlockPoolId());
|
||||||
|
String lastDNid = dnReg.getStorageID();
|
||||||
|
|
||||||
|
final Path fileName = new Path("/foo2");
|
||||||
|
DFSTestUtil.createFile(fs, fileName, SMALL_FILE_LENGTH, (short)4, 0L);
|
||||||
|
DFSTestUtil.waitReplication(fs, fileName, (short)4);
|
||||||
|
|
||||||
|
// Wait for tolerable number of heartbeats plus one
|
||||||
|
DatanodeDescriptor nodeInfo = null;
|
||||||
|
long lastHeartbeat = 0;
|
||||||
|
long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 *
|
||||||
|
(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1);
|
||||||
|
do {
|
||||||
|
nodeInfo =
|
||||||
|
namesystem.getBlockManager().getDatanodeManager().getDatanode(dnReg);
|
||||||
|
lastHeartbeat = nodeInfo.getLastUpdate();
|
||||||
|
} while(now() - lastHeartbeat < waitTime);
|
||||||
|
fs.setReplication(fileName, (short)3);
|
||||||
|
|
||||||
|
BlockLocation locs[] = fs.getFileBlockLocations(
|
||||||
|
fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
|
||||||
|
|
||||||
|
// All replicas for deletion should be scheduled on lastDN.
|
||||||
|
// And should not actually be deleted, because lastDN does not heartbeat.
|
||||||
|
namesystem.readLock();
|
||||||
|
Collection<Block> dnBlocks =
|
||||||
|
namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
|
||||||
|
assertEquals("Replicas on node " + lastDNid + " should have been deleted",
|
||||||
|
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());
|
||||||
|
namesystem.readUnlock();
|
||||||
|
for(BlockLocation location : locs)
|
||||||
|
assertEquals("Block should still have 4 replicas",
|
||||||
|
4, location.getNames().length);
|
||||||
|
} finally {
|
||||||
|
if(fs != null) fs.close();
|
||||||
|
if(cluster != null) cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test over replicated block should get invalidated when decreasing the
|
* Test over replicated block should get invalidated when decreasing the
|
||||||
* replication for a partial block.
|
* replication for a partial block.
|
||||||
|
|
Loading…
Reference in New Issue