HDFS-2882. DN continues to start up, even if block pool fails to initialize (Contributed by Vinayakumar B)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1590941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinayakumar B 2014-04-29 10:27:29 +00:00
parent 02d28907be
commit 9d21180c1a
10 changed files with 205 additions and 47 deletions

View File

@ -470,6 +470,9 @@ Release 2.4.1 - UNRELEASED
HDFS-6245. datanode fails to start with a bad disk even when failed HDFS-6245. datanode fails to start with a bad disk even when failed
volumes is set. (Arpit Agarwal) volumes is set. (Arpit Agarwal)
HDFS-2882. DN continues to start up, even if block pool fails to initialize
(vinayakumarb)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -146,6 +146,10 @@ class BPOfferService {
} }
} }
boolean hasBlockPoolId() {
return getNamespaceInfo() != null;
}
synchronized NamespaceInfo getNamespaceInfo() { synchronized NamespaceInfo getNamespaceInfo() {
return bpNSInfo; return bpNSInfo;
} }
@ -679,4 +683,17 @@ class BPOfferService {
return true; return true;
} }
/*
* Let the actor retry for initialization until all namenodes of cluster have
* failed.
*/
boolean shouldRetryInit() {
if (hasBlockPoolId()) {
// One of the namenode registered successfully. lets continue retry for
// other.
return true;
}
return isAlive();
}
} }

View File

@ -90,7 +90,12 @@ class BPServiceActor implements Runnable {
Thread bpThread; Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode; DatanodeProtocolClientSideTranslatorPB bpNamenode;
private volatile long lastHeartbeat = 0; private volatile long lastHeartbeat = 0;
private volatile boolean initialized = false;
static enum RunningState {
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
}
private volatile RunningState runningState = RunningState.CONNECTING;
/** /**
* Between block reports (which happen on the order of once an hour) the * Between block reports (which happen on the order of once an hour) the
@ -118,17 +123,12 @@ class BPServiceActor implements Runnable {
this.dnConf = dn.getDnConf(); this.dnConf = dn.getDnConf();
} }
/**
* returns true if BP thread has completed initialization of storage
* and has registered with the corresponding namenode
* @return true if initialized
*/
boolean isInitialized() {
return initialized;
}
boolean isAlive() { boolean isAlive() {
return shouldServiceRun && bpThread.isAlive(); if (!shouldServiceRun || !bpThread.isAlive()) {
return false;
}
return runningState == BPServiceActor.RunningState.RUNNING
|| runningState == BPServiceActor.RunningState.CONNECTING;
} }
@Override @Override
@ -805,18 +805,29 @@ class BPServiceActor implements Runnable {
LOG.info(this + " starting to offer service"); LOG.info(this + " starting to offer service");
try { try {
// init stuff while (true) {
try { // init stuff
// setup storage try {
connectToNNAndHandshake(); // setup storage
} catch (IOException ioe) { connectToNNAndHandshake();
// Initial handshake, storage recovery or registration failed break;
// End BPOfferService thread } catch (IOException ioe) {
LOG.fatal("Initialization failed for block pool " + this, ioe); // Initial handshake, storage recovery or registration failed
return; runningState = RunningState.INIT_FAILED;
if (shouldRetryInit()) {
// Retry until all namenode's of BPOS failed initialization
LOG.error("Initialization failed for " + this + " "
+ ioe.getLocalizedMessage());
sleepAndLogInterrupts(5000, "initializing");
} else {
runningState = RunningState.FAILED;
LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
return;
}
}
} }
initialized = true; // bp is initialized; runningState = RunningState.RUNNING;
while (shouldRun()) { while (shouldRun()) {
try { try {
@ -826,14 +837,20 @@ class BPServiceActor implements Runnable {
sleepAndLogInterrupts(5000, "offering service"); sleepAndLogInterrupts(5000, "offering service");
} }
} }
runningState = RunningState.EXITED;
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.warn("Unexpected exception in block pool " + this, ex); LOG.warn("Unexpected exception in block pool " + this, ex);
runningState = RunningState.FAILED;
} finally { } finally {
LOG.warn("Ending block pool service for: " + this); LOG.warn("Ending block pool service for: " + this);
cleanUp(); cleanUp();
} }
} }
private boolean shouldRetryInit() {
return shouldRun() && bpos.shouldRetryInit();
}
private boolean shouldRun() { private boolean shouldRun() {
return shouldServiceRun && dn.shouldRun(); return shouldServiceRun && dn.shouldRun();
} }

View File

@ -88,7 +88,11 @@ class BlockPoolManager {
synchronized void remove(BPOfferService t) { synchronized void remove(BPOfferService t) {
offerServices.remove(t); offerServices.remove(t);
bpByBlockPoolId.remove(t.getBlockPoolId()); if (t.hasBlockPoolId()) {
// It's possible that the block pool never successfully registered
// with any NN, so it was never added it to this map
bpByBlockPoolId.remove(t.getBlockPoolId());
}
boolean removed = false; boolean removed = false;
for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator(); for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();

View File

@ -847,19 +847,24 @@ public class DataNode extends Configured
*/ */
void shutdownBlockPool(BPOfferService bpos) { void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos); blockPoolManager.remove(bpos);
if (bpos.hasBlockPoolId()) {
// Possible that this is shutting down before successfully
// registering anywhere. If that's the case, we wouldn't have
// a block pool id
String bpId = bpos.getBlockPoolId();
if (blockScanner != null) {
blockScanner.removeBlockPool(bpId);
}
String bpId = bpos.getBlockPoolId(); if (data != null) {
if (blockScanner != null) { data.shutdownBlockPool(bpId);
blockScanner.removeBlockPool(bpId); }
if (storage != null) {
storage.removeBlockPoolStorage(bpId);
}
} }
if (data != null) {
data.shutdownBlockPool(bpId);
}
if (storage != null) {
storage.removeBlockPoolStorage(bpId);
}
} }
/** /**
@ -880,11 +885,11 @@ public class DataNode extends Configured
+ " should have retrieved namespace info before initBlockPool."); + " should have retrieved namespace info before initBlockPool.");
} }
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// Register the new block pool with the BP manager. // Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos); blockPoolManager.addBlockPool(bpos);
setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// In the case that this is the first block pool to connect, initialize // In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc. // the dataset, block scanners, etc.
initStorage(nsInfo); initStorage(nsInfo);
@ -1067,6 +1072,7 @@ public class DataNode extends Configured
Token<BlockTokenIdentifier> token) throws IOException { Token<BlockTokenIdentifier> token) throws IOException {
checkBlockLocalPathAccess(); checkBlockLocalPathAccess();
checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
Preconditions.checkNotNull(data, "Storage not yet initialized");
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
if (info != null) { if (info != null) {
@ -2427,6 +2433,7 @@ public class DataNode extends Configured
*/ */
@Override // DataNodeMXBean @Override // DataNodeMXBean
public String getVolumeInfo() { public String getVolumeInfo() {
Preconditions.checkNotNull(data, "Storage not yet initialized");
return JSON.toString(data.getVolumeInfoMap()); return JSON.toString(data.getVolumeInfoMap());
} }

View File

@ -221,9 +221,14 @@ public class DataStorage extends Storage {
// Each storage directory is treated individually. // Each storage directory is treated individually.
// During startup some of them can upgrade or rollback // During startup some of them can upgrade or rollback
// while others could be uptodate for the regular startup. // while others could be uptodate for the regular startup.
for(int idx = 0; idx < getNumStorageDirs(); idx++) { try {
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); for (int idx = 0; idx < getNumStorageDirs(); idx++) {
createStorageID(getStorageDir(idx)); doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
createStorageID(getStorageDir(idx));
}
} catch (IOException e) {
unlockAll();
throw e;
} }
// 3. Update all storages. Some of them might have just been formatted. // 3. Update all storages. Some of them might have just been formatted.

View File

@ -842,6 +842,13 @@ public class MiniDFSCluster {
nnCounterForFormat++; nnCounterForFormat++;
if (formatThisOne) { if (formatThisOne) {
// Allow overriding clusterID for specific NNs to test
// misconfiguration.
if (nn.getClusterId() == null) {
StartupOption.FORMAT.setClusterId(clusterId);
} else {
StartupOption.FORMAT.setClusterId(nn.getClusterId());
}
DFSTestUtil.formatNameNode(conf); DFSTestUtil.formatNameNode(conf);
} }
prevNNDirs = namespaceDirs; prevNNDirs = namespaceDirs;
@ -903,7 +910,7 @@ public class MiniDFSCluster {
} }
} }
private void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs, public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
Configuration dstConf) throws IOException { Configuration dstConf) throws IOException {
URI srcDir = Lists.newArrayList(srcDirs).get(0); URI srcDir = Lists.newArrayList(srcDirs).get(0);
FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw(); FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();

View File

@ -211,6 +211,7 @@ public class MiniDFSNNTopology {
private final String nnId; private final String nnId;
private int httpPort; private int httpPort;
private int ipcPort; private int ipcPort;
private String clusterId;
public NNConf(String nnId) { public NNConf(String nnId) {
this.nnId = nnId; this.nnId = nnId;
@ -228,6 +229,10 @@ public class MiniDFSNNTopology {
return httpPort; return httpPort;
} }
String getClusterId() {
return clusterId;
}
public NNConf setHttpPort(int httpPort) { public NNConf setHttpPort(int httpPort) {
this.httpPort = httpPort; this.httpPort = httpPort;
return this; return this;
@ -237,6 +242,11 @@ public class MiniDFSNNTopology {
this.ipcPort = ipcPort; this.ipcPort = ipcPort;
return this; return this;
} }
public NNConf setClusterId(String clusterId) {
this.clusterId = clusterId;
return this;
}
} }
} }

View File

@ -329,7 +329,9 @@ public class TestBPOfferService {
try { try {
waitForInitialization(bpos); waitForInitialization(bpos);
List<BPServiceActor> actors = bpos.getBPServiceActors(); List<BPServiceActor> actors = bpos.getBPServiceActors();
assertEquals(1, actors.size()); // even if one of the actor initialization fails also other will be
// running until both failed.
assertEquals(2, actors.size());
BPServiceActor actor = actors.get(0); BPServiceActor actor = actors.get(0);
waitForBlockReport(actor.getNameNodeProxy()); waitForBlockReport(actor.getNameNodeProxy());
} finally { } finally {
@ -342,7 +344,14 @@ public class TestBPOfferService {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
return bpos.countNameNodes() == 1; List<BPServiceActor> actors = bpos.getBPServiceActors();
int failedcount = 0;
for (BPServiceActor actor : actors) {
if (!actor.isAlive()) {
failedcount++;
}
}
return failedcount == 1;
} }
}, 100, 10000); }, 100, 10000);
} }

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -30,11 +32,14 @@ import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Assert; import org.junit.Assert;
@ -189,7 +194,7 @@ public class TestDataNodeMultipleRegistrations {
} }
@Test @Test
public void testClusterIdMismatch() throws IOException { public void testClusterIdMismatch() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.build(); .build();
@ -203,6 +208,7 @@ public class TestDataNodeMultipleRegistrations {
// add another namenode // add another namenode
cluster.addNameNode(conf, 9938); cluster.addNameNode(conf, 9938);
Thread.sleep(500);// lets wait for the registration to happen
bposs = dn.getAllBpOs(); bposs = dn.getAllBpOs();
LOG.info("dn bpos len (should be 3):" + bposs.length); LOG.info("dn bpos len (should be 3):" + bposs.length);
Assert.assertEquals("should've registered with three namenodes", bposs.length,3); Assert.assertEquals("should've registered with three namenodes", bposs.length,3);
@ -213,14 +219,88 @@ public class TestDataNodeMultipleRegistrations {
NameNode nn4 = cluster.getNameNode(3); NameNode nn4 = cluster.getNameNode(3);
assertNotNull("cannot create nn4", nn4); assertNotNull("cannot create nn4", nn4);
Thread.sleep(500);// lets wait for the registration to happen
bposs = dn.getAllBpOs(); bposs = dn.getAllBpOs();
LOG.info("dn bpos len (still should be 3):" + bposs.length); LOG.info("dn bpos len (still should be 3):" + bposs.length);
Assert.assertEquals("should've registered with three namenodes", 3, bposs.length); Assert.assertEquals("should've registered with three namenodes", 3, bposs.length);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 20000)
public void testClusterIdMismatchAtStartupWithHA() throws Exception {
MiniDFSNNTopology top = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn0"))
.addNN(new MiniDFSNNTopology.NNConf("nn1")))
.addNameservice(new MiniDFSNNTopology.NSConf("ns2")
.addNN(new MiniDFSNNTopology.NNConf("nn2").setClusterId("bad-cid"))
.addNN(new MiniDFSNNTopology.NNConf("nn3").setClusterId("bad-cid")));
top.setFederation(true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(top)
.numDataNodes(0).build();
try {
cluster.startDataNodes(conf, 1, true, null, null);
// let the initialization be complete
Thread.sleep(10000);
DataNode dn = cluster.getDataNodes().get(0);
assertTrue("Datanode should be running", dn.isDatanodeUp());
assertEquals("Only one BPOfferService should be running", 1,
dn.getAllBpOs().length);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test
public void testDNWithInvalidStorageWithHA() throws Exception {
MiniDFSNNTopology top = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn0").setClusterId("cluster-1"))
.addNN(new MiniDFSNNTopology.NNConf("nn1").setClusterId("cluster-1")));
top.setFederation(true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(top)
.numDataNodes(0).build();
try {
cluster.startDataNodes(conf, 1, true, null, null);
// let the initialization be complete
Thread.sleep(10000);
DataNode dn = cluster.getDataNodes().get(0);
assertTrue("Datanode should be running", dn.isDatanodeUp());
assertEquals("BPOfferService should be running", 1,
dn.getAllBpOs().length);
DataNodeProperties dnProp = cluster.stopDataNode(0);
cluster.getNameNode(0).stop();
cluster.getNameNode(1).stop();
Configuration nn1 = cluster.getConfiguration(0);
Configuration nn2 = cluster.getConfiguration(1);
// setting up invalid cluster
StartupOption.FORMAT.setClusterId("cluster-2");
DFSTestUtil.formatNameNode(nn1);
MiniDFSCluster.copyNameDirs(FSNamesystem.getNamespaceDirs(nn1),
FSNamesystem.getNamespaceDirs(nn2), nn2);
cluster.restartNameNode(0, false);
cluster.restartNameNode(1, false);
cluster.restartDataNode(dnProp);
// let the initialization be complete
Thread.sleep(10000);
dn = cluster.getDataNodes().get(0);
assertFalse("Datanode should have shutdown as only service failed",
dn.isDatanodeUp());
} finally {
cluster.shutdown();
}
}
@Test @Test
public void testMiniDFSClusterWithMultipleNN() throws IOException { public void testMiniDFSClusterWithMultipleNN() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -231,7 +311,6 @@ public class TestDataNodeMultipleRegistrations {
// add a node // add a node
try { try {
Assert.assertNotNull(cluster);
cluster.waitActive(); cluster.waitActive();
Assert.assertEquals("(1)Should be 2 namenodes", 2, cluster.getNumNameNodes()); Assert.assertEquals("(1)Should be 2 namenodes", 2, cluster.getNumNameNodes());