HDFS-7373. Allow decommissioning of dead DataNodes. Contributed by Zhe Zhang.
(cherry picked from commit 5bd048e837
)
This commit is contained in:
parent
2e15754a92
commit
fbce4df711
|
@ -201,6 +201,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7225. Remove stale block invalidation work when DN re-registers with
|
HDFS-7225. Remove stale block invalidation work when DN re-registers with
|
||||||
different UUID. (Zhe Zhang and Andrew Wang)
|
different UUID. (Zhe Zhang and Andrew Wang)
|
||||||
|
|
||||||
|
HDFS-7374. Allow decommissioning of dead DataNodes. (Zhe Zhang)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -844,7 +844,11 @@ public class DatanodeManager {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void startDecommission(DatanodeDescriptor node) {
|
public void startDecommission(DatanodeDescriptor node) {
|
||||||
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
if (!node.isDecommissionInProgress()) {
|
||||||
|
if (!node.isAlive) {
|
||||||
|
LOG.info("Dead node " + node + " is decommissioned immediately.");
|
||||||
|
node.setDecommissioned();
|
||||||
|
} else if (!node.isDecommissioned()) {
|
||||||
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
|
||||||
LOG.info("Start Decommissioning " + node + " " + storage
|
LOG.info("Start Decommissioning " + node + " " + storage
|
||||||
+ " with " + storage.numBlocks() + " blocks");
|
+ " with " + storage.numBlocks() + " blocks");
|
||||||
|
@ -856,6 +860,7 @@ public class DatanodeManager {
|
||||||
checkDecommissionState(node);
|
checkDecommissionState(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Stop decommissioning the specified datanodes. */
|
/** Stop decommissioning the specified datanodes. */
|
||||||
void stopDecommission(DatanodeDescriptor node) {
|
void stopDecommission(DatanodeDescriptor node) {
|
||||||
|
@ -1008,14 +1013,13 @@ public class DatanodeManager {
|
||||||
|
|
||||||
// register new datanode
|
// register new datanode
|
||||||
addDatanode(nodeDescr);
|
addDatanode(nodeDescr);
|
||||||
checkDecommissioning(nodeDescr);
|
|
||||||
|
|
||||||
// also treat the registration message as a heartbeat
|
// also treat the registration message as a heartbeat
|
||||||
// no need to update its timestamp
|
// no need to update its timestamp
|
||||||
// because its is done when the descriptor is created
|
// because its is done when the descriptor is created
|
||||||
heartbeatManager.addDatanode(nodeDescr);
|
heartbeatManager.addDatanode(nodeDescr);
|
||||||
success = true;
|
|
||||||
incrementVersionCount(nodeReg.getSoftwareVersion());
|
incrementVersionCount(nodeReg.getSoftwareVersion());
|
||||||
|
checkDecommissioning(nodeDescr);
|
||||||
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) {
|
if (!success) {
|
||||||
removeDatanode(nodeDescr);
|
removeDatanode(nodeDescr);
|
||||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -1636,4 +1635,23 @@ public class DFSTestUtil {
|
||||||
LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
|
LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
|
||||||
new LayoutVersion.LayoutFeature[] { feature });
|
new LayoutVersion.LayoutFeature[] { feature });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for datanode to reach alive or dead state for waitTime given in
|
||||||
|
* milliseconds.
|
||||||
|
*/
|
||||||
|
public static void waitForDatanodeState(
|
||||||
|
final MiniDFSCluster cluster, final String nodeID,
|
||||||
|
final boolean alive, int waitTime)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
|
||||||
|
namesystem, nodeID);
|
||||||
|
return (dd.isAlive == alive);
|
||||||
|
}
|
||||||
|
}, 100, waitTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,17 +21,15 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
|
@ -43,7 +41,6 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -60,29 +57,6 @@ public class TestDeadDatanode {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* wait for datanode to reach alive or dead state for waitTime given in
|
|
||||||
* milliseconds.
|
|
||||||
*/
|
|
||||||
private void waitForDatanodeState(String nodeID, boolean alive, int waitTime)
|
|
||||||
throws TimeoutException, InterruptedException {
|
|
||||||
long stopTime = Time.now() + waitTime;
|
|
||||||
FSNamesystem namesystem = cluster.getNamesystem();
|
|
||||||
String state = alive ? "alive" : "dead";
|
|
||||||
while (Time.now() < stopTime) {
|
|
||||||
final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
|
|
||||||
namesystem, nodeID);
|
|
||||||
if (dd.isAlive == alive) {
|
|
||||||
LOG.info("datanode " + nodeID + " is " + state);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.info("Waiting for datanode " + nodeID + " to become " + state);
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
throw new TimeoutException("Timedout waiting for datanode reach state "
|
|
||||||
+ state);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to ensure namenode rejects request from dead datanode
|
* Test to ensure namenode rejects request from dead datanode
|
||||||
* - Start a cluster
|
* - Start a cluster
|
||||||
|
@ -104,11 +78,11 @@ public class TestDeadDatanode {
|
||||||
DatanodeRegistration reg =
|
DatanodeRegistration reg =
|
||||||
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
|
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
|
||||||
|
|
||||||
waitForDatanodeState(reg.getDatanodeUuid(), true, 20000);
|
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), true, 20000);
|
||||||
|
|
||||||
// Shutdown and wait for datanode to be marked dead
|
// Shutdown and wait for datanode to be marked dead
|
||||||
dn.shutdown();
|
dn.shutdown();
|
||||||
waitForDatanodeState(reg.getDatanodeUuid(), false, 20000);
|
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), false, 20000);
|
||||||
|
|
||||||
DatanodeProtocol dnp = cluster.getNameNodeRpc();
|
DatanodeProtocol dnp = cluster.getNameNodeRpc();
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -372,4 +373,35 @@ public class TestDecommissioningStatus {
|
||||||
dm.refreshNodes(conf);
|
dm.refreshNodes(conf);
|
||||||
cleanupFile(fileSys, f);
|
cleanupFile(fileSys, f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify the support for decommissioning a datanode that is already dead.
|
||||||
|
* Under this scenario the datanode should immediately be marked as
|
||||||
|
* DECOMMISSIONED
|
||||||
|
*/
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testDecommissionDeadDN()
|
||||||
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
|
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
|
||||||
|
String dnName = dnID.getXferAddr();
|
||||||
|
DataNodeProperties stoppedDN = cluster.stopDataNode(0);
|
||||||
|
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
|
||||||
|
false, 30000);
|
||||||
|
FSNamesystem fsn = cluster.getNamesystem();
|
||||||
|
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
|
||||||
|
DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
|
||||||
|
decommissionNode(fsn, localFileSys, dnName);
|
||||||
|
dm.refreshNodes(conf);
|
||||||
|
BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor);
|
||||||
|
assertTrue(dnDescriptor.isDecommissioned());
|
||||||
|
|
||||||
|
// Add the node back
|
||||||
|
cluster.restartDataNode(stoppedDN, true);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Call refreshNodes on FSNamesystem with empty exclude file to remove the
|
||||||
|
// datanode from decommissioning list and make it available again.
|
||||||
|
writeConfigFile(localFileSys, excludeFile, null);
|
||||||
|
dm.refreshNodes(conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue