Merge r1590941 from trunk, HDFS-2882. DN continues to start up, even if block pool fails to initialize (Contributed by Vinayakumar B)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1590944 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f37186167a
commit
8be14bbf65
|
@ -218,6 +218,9 @@ Release 2.4.1 - UNRELEASED
|
|||
HDFS-6245. datanode fails to start with a bad disk even when failed
|
||||
volumes is set. (Arpit Agarwal)
|
||||
|
||||
HDFS-2882. DN continues to start up, even if block pool fails to initialize
|
||||
(vinayakumarb)
|
||||
|
||||
Release 2.4.0 - 2014-04-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -145,7 +145,11 @@ class BPOfferService {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
boolean hasBlockPoolId() {
|
||||
return getNamespaceInfo() != null;
|
||||
}
|
||||
|
||||
synchronized NamespaceInfo getNamespaceInfo() {
|
||||
return bpNSInfo;
|
||||
}
|
||||
|
@ -680,4 +684,17 @@ class BPOfferService {
|
|||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Let the actor retry for initialization until all namenodes of cluster have
|
||||
* failed.
|
||||
*/
|
||||
boolean shouldRetryInit() {
|
||||
if (hasBlockPoolId()) {
|
||||
// One of the namenode registered successfully. lets continue retry for
|
||||
// other.
|
||||
return true;
|
||||
}
|
||||
return isAlive();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -90,8 +90,13 @@ class BPServiceActor implements Runnable {
|
|||
Thread bpThread;
|
||||
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
||||
private volatile long lastHeartbeat = 0;
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
|
||||
static enum RunningState {
|
||||
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
|
||||
}
|
||||
|
||||
private volatile RunningState runningState = RunningState.CONNECTING;
|
||||
|
||||
/**
|
||||
* Between block reports (which happen on the order of once an hour) the
|
||||
* DN reports smaller incremental changes to its block list. This map,
|
||||
|
@ -118,17 +123,12 @@ class BPServiceActor implements Runnable {
|
|||
this.dnConf = dn.getDnConf();
|
||||
}
|
||||
|
||||
/**
|
||||
* returns true if BP thread has completed initialization of storage
|
||||
* and has registered with the corresponding namenode
|
||||
* @return true if initialized
|
||||
*/
|
||||
boolean isInitialized() {
|
||||
return initialized;
|
||||
}
|
||||
|
||||
boolean isAlive() {
|
||||
return shouldServiceRun && bpThread.isAlive();
|
||||
if (!shouldServiceRun || !bpThread.isAlive()) {
|
||||
return false;
|
||||
}
|
||||
return runningState == BPServiceActor.RunningState.RUNNING
|
||||
|| runningState == BPServiceActor.RunningState.CONNECTING;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -805,19 +805,30 @@ class BPServiceActor implements Runnable {
|
|||
LOG.info(this + " starting to offer service");
|
||||
|
||||
try {
|
||||
// init stuff
|
||||
try {
|
||||
// setup storage
|
||||
connectToNNAndHandshake();
|
||||
} catch (IOException ioe) {
|
||||
// Initial handshake, storage recovery or registration failed
|
||||
// End BPOfferService thread
|
||||
LOG.fatal("Initialization failed for block pool " + this, ioe);
|
||||
return;
|
||||
while (true) {
|
||||
// init stuff
|
||||
try {
|
||||
// setup storage
|
||||
connectToNNAndHandshake();
|
||||
break;
|
||||
} catch (IOException ioe) {
|
||||
// Initial handshake, storage recovery or registration failed
|
||||
runningState = RunningState.INIT_FAILED;
|
||||
if (shouldRetryInit()) {
|
||||
// Retry until all namenode's of BPOS failed initialization
|
||||
LOG.error("Initialization failed for " + this + " "
|
||||
+ ioe.getLocalizedMessage());
|
||||
sleepAndLogInterrupts(5000, "initializing");
|
||||
} else {
|
||||
runningState = RunningState.FAILED;
|
||||
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
initialized = true; // bp is initialized;
|
||||
|
||||
runningState = RunningState.RUNNING;
|
||||
|
||||
while (shouldRun()) {
|
||||
try {
|
||||
offerService();
|
||||
|
@ -826,14 +837,20 @@ class BPServiceActor implements Runnable {
|
|||
sleepAndLogInterrupts(5000, "offering service");
|
||||
}
|
||||
}
|
||||
runningState = RunningState.EXITED;
|
||||
} catch (Throwable ex) {
|
||||
LOG.warn("Unexpected exception in block pool " + this, ex);
|
||||
runningState = RunningState.FAILED;
|
||||
} finally {
|
||||
LOG.warn("Ending block pool service for: " + this);
|
||||
cleanUp();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldRetryInit() {
|
||||
return shouldRun() && bpos.shouldRetryInit();
|
||||
}
|
||||
|
||||
private boolean shouldRun() {
|
||||
return shouldServiceRun && dn.shouldRun();
|
||||
}
|
||||
|
|
|
@ -88,7 +88,11 @@ class BlockPoolManager {
|
|||
|
||||
synchronized void remove(BPOfferService t) {
|
||||
offerServices.remove(t);
|
||||
bpByBlockPoolId.remove(t.getBlockPoolId());
|
||||
if (t.hasBlockPoolId()) {
|
||||
// It's possible that the block pool never successfully registered
|
||||
// with any NN, so it was never added it to this map
|
||||
bpByBlockPoolId.remove(t.getBlockPoolId());
|
||||
}
|
||||
|
||||
boolean removed = false;
|
||||
for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();
|
||||
|
|
|
@ -854,19 +854,24 @@ public class DataNode extends Configured
|
|||
*/
|
||||
void shutdownBlockPool(BPOfferService bpos) {
|
||||
blockPoolManager.remove(bpos);
|
||||
if (bpos.hasBlockPoolId()) {
|
||||
// Possible that this is shutting down before successfully
|
||||
// registering anywhere. If that's the case, we wouldn't have
|
||||
// a block pool id
|
||||
String bpId = bpos.getBlockPoolId();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.removeBlockPool(bpId);
|
||||
}
|
||||
|
||||
String bpId = bpos.getBlockPoolId();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.removeBlockPool(bpId);
|
||||
}
|
||||
|
||||
if (data != null) {
|
||||
data.shutdownBlockPool(bpId);
|
||||
if (data != null) {
|
||||
data.shutdownBlockPool(bpId);
|
||||
}
|
||||
|
||||
if (storage != null) {
|
||||
storage.removeBlockPoolStorage(bpId);
|
||||
}
|
||||
}
|
||||
|
||||
if (storage != null) {
|
||||
storage.removeBlockPoolStorage(bpId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -887,10 +892,10 @@ public class DataNode extends Configured
|
|||
+ " should have retrieved namespace info before initBlockPool.");
|
||||
}
|
||||
|
||||
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
|
||||
|
||||
// Register the new block pool with the BP manager.
|
||||
blockPoolManager.addBlockPool(bpos);
|
||||
|
||||
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
|
||||
|
||||
// In the case that this is the first block pool to connect, initialize
|
||||
// the dataset, block scanners, etc.
|
||||
|
@ -1074,6 +1079,7 @@ public class DataNode extends Configured
|
|||
Token<BlockTokenIdentifier> token) throws IOException {
|
||||
checkBlockLocalPathAccess();
|
||||
checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
|
||||
Preconditions.checkNotNull(data, "Storage not yet initialized");
|
||||
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (info != null) {
|
||||
|
@ -2434,6 +2440,7 @@ public class DataNode extends Configured
|
|||
*/
|
||||
@Override // DataNodeMXBean
|
||||
public String getVolumeInfo() {
|
||||
Preconditions.checkNotNull(data, "Storage not yet initialized");
|
||||
return JSON.toString(data.getVolumeInfoMap());
|
||||
}
|
||||
|
||||
|
|
|
@ -221,11 +221,16 @@ public class DataStorage extends Storage {
|
|||
// Each storage directory is treated individually.
|
||||
// During startup some of them can upgrade or rollback
|
||||
// while others could be uptodate for the regular startup.
|
||||
for(int idx = 0; idx < getNumStorageDirs(); idx++) {
|
||||
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
||||
createStorageID(getStorageDir(idx));
|
||||
try {
|
||||
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
||||
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
||||
createStorageID(getStorageDir(idx));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
unlockAll();
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
||||
// 3. Update all storages. Some of them might have just been formatted.
|
||||
this.writeAll();
|
||||
|
||||
|
|
|
@ -844,6 +844,13 @@ public class MiniDFSCluster {
|
|||
|
||||
nnCounterForFormat++;
|
||||
if (formatThisOne) {
|
||||
// Allow overriding clusterID for specific NNs to test
|
||||
// misconfiguration.
|
||||
if (nn.getClusterId() == null) {
|
||||
StartupOption.FORMAT.setClusterId(clusterId);
|
||||
} else {
|
||||
StartupOption.FORMAT.setClusterId(nn.getClusterId());
|
||||
}
|
||||
DFSTestUtil.formatNameNode(conf);
|
||||
}
|
||||
prevNNDirs = namespaceDirs;
|
||||
|
@ -905,7 +912,7 @@ public class MiniDFSCluster {
|
|||
}
|
||||
}
|
||||
|
||||
private void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
|
||||
public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
|
||||
Configuration dstConf) throws IOException {
|
||||
URI srcDir = Lists.newArrayList(srcDirs).get(0);
|
||||
FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
|
||||
|
|
|
@ -211,6 +211,7 @@ public class MiniDFSNNTopology {
|
|||
private final String nnId;
|
||||
private int httpPort;
|
||||
private int ipcPort;
|
||||
private String clusterId;
|
||||
|
||||
public NNConf(String nnId) {
|
||||
this.nnId = nnId;
|
||||
|
@ -228,6 +229,10 @@ public class MiniDFSNNTopology {
|
|||
return httpPort;
|
||||
}
|
||||
|
||||
String getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public NNConf setHttpPort(int httpPort) {
|
||||
this.httpPort = httpPort;
|
||||
return this;
|
||||
|
@ -237,6 +242,11 @@ public class MiniDFSNNTopology {
|
|||
this.ipcPort = ipcPort;
|
||||
return this;
|
||||
}
|
||||
|
||||
public NNConf setClusterId(String clusterId) {
|
||||
this.clusterId = clusterId;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -329,7 +329,9 @@ public class TestBPOfferService {
|
|||
try {
|
||||
waitForInitialization(bpos);
|
||||
List<BPServiceActor> actors = bpos.getBPServiceActors();
|
||||
assertEquals(1, actors.size());
|
||||
// even if one of the actor initialization fails also other will be
|
||||
// running until both failed.
|
||||
assertEquals(2, actors.size());
|
||||
BPServiceActor actor = actors.get(0);
|
||||
waitForBlockReport(actor.getNameNodeProxy());
|
||||
} finally {
|
||||
|
@ -342,7 +344,14 @@ public class TestBPOfferService {
|
|||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return bpos.countNameNodes() == 1;
|
||||
List<BPServiceActor> actors = bpos.getBPServiceActors();
|
||||
int failedcount = 0;
|
||||
for (BPServiceActor actor : actors) {
|
||||
if (!actor.isAlive()) {
|
||||
failedcount++;
|
||||
}
|
||||
}
|
||||
return failedcount == 1;
|
||||
}
|
||||
}, 100, 10000);
|
||||
}
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -30,11 +32,14 @@ import java.util.Map;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -189,7 +194,7 @@ public class TestDataNodeMultipleRegistrations {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testClusterIdMismatch() throws IOException {
|
||||
public void testClusterIdMismatch() throws Exception {
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||
.build();
|
||||
|
@ -203,6 +208,7 @@ public class TestDataNodeMultipleRegistrations {
|
|||
|
||||
// add another namenode
|
||||
cluster.addNameNode(conf, 9938);
|
||||
Thread.sleep(500);// lets wait for the registration to happen
|
||||
bposs = dn.getAllBpOs();
|
||||
LOG.info("dn bpos len (should be 3):" + bposs.length);
|
||||
Assert.assertEquals("should've registered with three namenodes", bposs.length,3);
|
||||
|
@ -212,15 +218,89 @@ public class TestDataNodeMultipleRegistrations {
|
|||
cluster.addNameNode(conf, 9948);
|
||||
NameNode nn4 = cluster.getNameNode(3);
|
||||
assertNotNull("cannot create nn4", nn4);
|
||||
|
||||
|
||||
Thread.sleep(500);// lets wait for the registration to happen
|
||||
bposs = dn.getAllBpOs();
|
||||
LOG.info("dn bpos len (still should be 3):" + bposs.length);
|
||||
Assert.assertEquals("should've registered with three namenodes", 3, bposs.length);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testClusterIdMismatchAtStartupWithHA() throws Exception {
|
||||
MiniDFSNNTopology top = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn0"))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1")))
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns2")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn2").setClusterId("bad-cid"))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn3").setClusterId("bad-cid")));
|
||||
|
||||
top.setFederation(true);
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(top)
|
||||
.numDataNodes(0).build();
|
||||
|
||||
try {
|
||||
cluster.startDataNodes(conf, 1, true, null, null);
|
||||
// let the initialization be complete
|
||||
Thread.sleep(10000);
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
assertTrue("Datanode should be running", dn.isDatanodeUp());
|
||||
assertEquals("Only one BPOfferService should be running", 1,
|
||||
dn.getAllBpOs().length);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDNWithInvalidStorageWithHA() throws Exception {
|
||||
MiniDFSNNTopology top = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn0").setClusterId("cluster-1"))
|
||||
.addNN(new MiniDFSNNTopology.NNConf("nn1").setClusterId("cluster-1")));
|
||||
|
||||
top.setFederation(true);
|
||||
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(top)
|
||||
.numDataNodes(0).build();
|
||||
try {
|
||||
cluster.startDataNodes(conf, 1, true, null, null);
|
||||
// let the initialization be complete
|
||||
Thread.sleep(10000);
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
assertTrue("Datanode should be running", dn.isDatanodeUp());
|
||||
assertEquals("BPOfferService should be running", 1,
|
||||
dn.getAllBpOs().length);
|
||||
DataNodeProperties dnProp = cluster.stopDataNode(0);
|
||||
|
||||
cluster.getNameNode(0).stop();
|
||||
cluster.getNameNode(1).stop();
|
||||
Configuration nn1 = cluster.getConfiguration(0);
|
||||
Configuration nn2 = cluster.getConfiguration(1);
|
||||
// setting up invalid cluster
|
||||
StartupOption.FORMAT.setClusterId("cluster-2");
|
||||
DFSTestUtil.formatNameNode(nn1);
|
||||
MiniDFSCluster.copyNameDirs(FSNamesystem.getNamespaceDirs(nn1),
|
||||
FSNamesystem.getNamespaceDirs(nn2), nn2);
|
||||
cluster.restartNameNode(0, false);
|
||||
cluster.restartNameNode(1, false);
|
||||
cluster.restartDataNode(dnProp);
|
||||
|
||||
// let the initialization be complete
|
||||
Thread.sleep(10000);
|
||||
dn = cluster.getDataNodes().get(0);
|
||||
assertFalse("Datanode should have shutdown as only service failed",
|
||||
dn.isDatanodeUp());
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMiniDFSClusterWithMultipleNN() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
|
@ -231,7 +311,6 @@ public class TestDataNodeMultipleRegistrations {
|
|||
|
||||
// add a node
|
||||
try {
|
||||
Assert.assertNotNull(cluster);
|
||||
cluster.waitActive();
|
||||
Assert.assertEquals("(1)Should be 2 namenodes", 2, cluster.getNumNameNodes());
|
||||
|
||||
|
|
Loading…
Reference in New Issue