HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned while dead. Contributed by Kihwal Lee.
This commit is contained in:
parent
993ab15b39
commit
d1e6b6db7c
@ -1766,7 +1766,8 @@ List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
|
||||
*
|
||||
* We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
|
||||
* since the former do not have write traffic and hence are less busy.
|
||||
* We do not use already decommissioned nodes as a source.
|
||||
* We do not use already decommissioned nodes as a source, unless there no
|
||||
* other choice.
|
||||
* Otherwise we choose a random node among those that did not reach their
|
||||
* replication limits. However, if the replication is of the highest priority
|
||||
* and all nodes have reached their replication limits, we will choose a
|
||||
@ -1798,6 +1799,7 @@ DatanodeDescriptor chooseSourceDatanode(Block block,
|
||||
containingNodes.clear();
|
||||
nodesContainingLiveReplicas.clear();
|
||||
DatanodeDescriptor srcNode = null;
|
||||
DatanodeDescriptor decommissionedSrc = null;
|
||||
int live = 0;
|
||||
int readonly = 0;
|
||||
int decommissioned = 0;
|
||||
@ -1844,9 +1846,16 @@ else if (node.isDecommissionInProgress()) {
|
||||
// the block must not be scheduled for removal on srcNode
|
||||
if(excessBlocks != null && excessBlocks.contains(block))
|
||||
continue;
|
||||
// never use already decommissioned nodes
|
||||
if(node.isDecommissioned())
|
||||
// Save the live decommissioned replica in case we need it. Such replicas
|
||||
// are normally not used for replication, but if nothing else is
|
||||
// available, one can be selected as a source.
|
||||
if (node.isDecommissioned()) {
|
||||
if (decommissionedSrc == null ||
|
||||
ThreadLocalRandom.current().nextBoolean()) {
|
||||
decommissionedSrc = node;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// We got this far, current node is a reasonable choice
|
||||
if (srcNode == null) {
|
||||
@ -1862,6 +1871,10 @@ else if (node.isDecommissionInProgress()) {
|
||||
if(numReplicas != null)
|
||||
numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
|
||||
excess, 0);
|
||||
// Pick a live decommissioned replica, if nothing else is available.
|
||||
if (live == 0 && srcNode == null && decommissionedSrc != null) {
|
||||
return decommissionedSrc;
|
||||
}
|
||||
return srcNode;
|
||||
}
|
||||
|
||||
@ -2677,7 +2690,7 @@ private Block addStoredBlock(final BlockInfo block,
|
||||
|
||||
int curReplicaDelta;
|
||||
if (result == AddBlockResult.ADDED) {
|
||||
curReplicaDelta = 1;
|
||||
curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
|
||||
if (logEveryBlock) {
|
||||
blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
|
||||
node, storedBlock, storedBlock.getNumBytes());
|
||||
|
@ -310,9 +310,9 @@ synchronized void update(BlockInfo block, int curReplicas,
|
||||
" curPri " + curPri +
|
||||
" oldPri " + oldPri);
|
||||
}
|
||||
if(oldPri != curPri) {
|
||||
remove(block, oldPri);
|
||||
}
|
||||
// oldPri is mostly correct, but not always. If not found with oldPri,
|
||||
// other levels will be searched until the block is found & removed.
|
||||
remove(block, oldPri);
|
||||
if(priorityQueues.get(curPri).add(block)) {
|
||||
NameNode.blockStateChangeLog.debug(
|
||||
"BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " +
|
||||
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
@ -75,6 +76,7 @@ public class TestDecommissioningStatus {
|
||||
private static FileSystem localFileSys;
|
||||
private static Configuration conf;
|
||||
private static Path dir;
|
||||
private static Logger LOG;
|
||||
|
||||
final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
|
||||
|
||||
@ -98,7 +100,7 @@ public void setUp() throws Exception {
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
||||
4);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
|
||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
|
||||
|
||||
@ -111,6 +113,7 @@ public void setUp() throws Exception {
|
||||
cluster.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.setHeartbeatExpireInterval(3000);
|
||||
Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
|
||||
LOG = Logger.getLogger(TestDecommissioningStatus.class);
|
||||
}
|
||||
|
||||
@After
|
||||
@ -431,4 +434,110 @@ public void testDecommissionDeadDN() throws Exception {
|
||||
writeConfigFile(localFileSys, excludeFile, null);
|
||||
dm.refreshNodes(conf);
|
||||
}
|
||||
|
||||
@Test(timeout=120000)
|
||||
public void testDecommissionLosingData() throws Exception {
|
||||
ArrayList<String> nodes = new ArrayList<String>(2);
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
BlockManager bm = fsn.getBlockManager();
|
||||
DatanodeManager dm = bm.getDatanodeManager();
|
||||
Path file1 = new Path("decommissionLosingData.dat");
|
||||
writeFile(fileSys, file1, (short)numDatanodes);
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Shutdown dn1
|
||||
LOG.info("Shutdown dn1");
|
||||
DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId();
|
||||
String dnName = dnID.getXferAddr();
|
||||
DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID);
|
||||
nodes.add(dnName);
|
||||
DataNodeProperties stoppedDN1 = cluster.stopDataNode(1);
|
||||
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
|
||||
false, 30000);
|
||||
|
||||
// Shutdown dn0
|
||||
LOG.info("Shutdown dn0");
|
||||
dnID = cluster.getDataNodes().get(0).getDatanodeId();
|
||||
dnName = dnID.getXferAddr();
|
||||
DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID);
|
||||
nodes.add(dnName);
|
||||
DataNodeProperties stoppedDN0 = cluster.stopDataNode(0);
|
||||
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
|
||||
false, 30000);
|
||||
|
||||
// Decommission the nodes.
|
||||
LOG.info("Decommissioning nodes");
|
||||
writeConfigFile(localFileSys, excludeFile, nodes);
|
||||
dm.refreshNodes(conf);
|
||||
BlockManagerTestUtil.recheckDecommissionState(dm);
|
||||
assertTrue(dnDescriptor0.isDecommissioned());
|
||||
assertTrue(dnDescriptor1.isDecommissioned());
|
||||
|
||||
// All nodes are dead and decommed. Blocks should be missing.
|
||||
long missingBlocks = bm.getMissingBlocksCount();
|
||||
long underreplicated = bm.getUnderReplicatedBlocksCount();
|
||||
assertTrue(missingBlocks > 0);
|
||||
assertTrue(underreplicated > 0);
|
||||
|
||||
// Bring back dn0
|
||||
LOG.info("Bring back dn0");
|
||||
cluster.restartDataNode(stoppedDN0, true);
|
||||
do {
|
||||
dnID = cluster.getDataNodes().get(0).getDatanodeId();
|
||||
} while (dnID == null);
|
||||
dnDescriptor0 = dm.getDatanode(dnID);
|
||||
// Wait until it sends a block report.
|
||||
while (dnDescriptor0.numBlocks() == 0) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// Bring back dn1
|
||||
LOG.info("Bring back dn1");
|
||||
cluster.restartDataNode(stoppedDN1, true);
|
||||
do {
|
||||
dnID = cluster.getDataNodes().get(1).getDatanodeId();
|
||||
} while (dnID == null);
|
||||
dnDescriptor1 = dm.getDatanode(dnID);
|
||||
// Wait until it sends a block report.
|
||||
while (dnDescriptor1.numBlocks() == 0) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// Blocks should be still be under-replicated
|
||||
Thread.sleep(2000); // Let replication monitor run
|
||||
assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
|
||||
|
||||
// Start up a node.
|
||||
LOG.info("Starting two more nodes");
|
||||
cluster.startDataNodes(conf, 2, true, null, null);
|
||||
cluster.waitActive();
|
||||
// Replication should fix it.
|
||||
int count = 0;
|
||||
while((bm.getUnderReplicatedBlocksCount() > 0 ||
|
||||
bm.getPendingReplicationBlocksCount() > 0) &&
|
||||
count++ < 10) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
assertEquals(0, bm.getUnderReplicatedBlocksCount());
|
||||
assertEquals(0, bm.getPendingReplicationBlocksCount());
|
||||
assertEquals(0, bm.getMissingBlocksCount());
|
||||
|
||||
// Shutdown the extra nodes.
|
||||
dnID = cluster.getDataNodes().get(3).getDatanodeId();
|
||||
cluster.stopDataNode(3);
|
||||
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
|
||||
false, 30000);
|
||||
|
||||
dnID = cluster.getDataNodes().get(2).getDatanodeId();
|
||||
cluster.stopDataNode(2);
|
||||
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
|
||||
false, 30000);
|
||||
|
||||
// Call refreshNodes on FSNamesystem with empty exclude file to remove the
|
||||
// datanode from decommissioning list and make it available again.
|
||||
writeConfigFile(localFileSys, excludeFile, null);
|
||||
dm.refreshNodes(conf);
|
||||
fileSys.delete(file1, false);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user