HDFS-4201. NPE in BPServiceActor#sendHeartBeat (jxiang via cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1550268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
52125d151d
commit
36488a641a
|
@ -370,6 +370,8 @@ Release 2.3.0 - UNRELEASED
|
||||||
HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a
|
HDFS-5074. Allow starting up from an fsimage checkpoint in the middle of a
|
||||||
segment. (Todd Lipcon via atm)
|
segment. (Todd Lipcon via atm)
|
||||||
|
|
||||||
|
HDFS-4201. NPE in BPServiceActor#sendHeartBeat. (jxiang via cmccabe)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -273,12 +273,22 @@ class BPOfferService {
|
||||||
synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
|
synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
|
||||||
if (this.bpNSInfo == null) {
|
if (this.bpNSInfo == null) {
|
||||||
this.bpNSInfo = nsInfo;
|
this.bpNSInfo = nsInfo;
|
||||||
|
boolean success = false;
|
||||||
|
|
||||||
// Now that we know the namespace ID, etc, we can pass this to the DN.
|
// Now that we know the namespace ID, etc, we can pass this to the DN.
|
||||||
// The DN can now initialize its local storage if we are the
|
// The DN can now initialize its local storage if we are the
|
||||||
// first BP to handshake, etc.
|
// first BP to handshake, etc.
|
||||||
dn.initBlockPool(this);
|
try {
|
||||||
return;
|
dn.initBlockPool(this);
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
if (!success) {
|
||||||
|
// The datanode failed to initialize the BP. We need to reset
|
||||||
|
// the namespace info so that other BPService actors still have
|
||||||
|
// a chance to set it, and re-initialize the datanode.
|
||||||
|
this.bpNSInfo = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
|
checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
|
||||||
"Blockpool ID");
|
"Blockpool ID");
|
||||||
|
|
|
@ -25,7 +25,9 @@ import static org.junit.Assert.assertSame;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -292,6 +294,47 @@ public class TestBPOfferService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test datanode block pool initialization error handling.
|
||||||
|
* Failure in initializing a block pool should not cause NPE.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBPInitErrorHandling() throws Exception {
|
||||||
|
final DataNode mockDn = Mockito.mock(DataNode.class);
|
||||||
|
Mockito.doReturn(true).when(mockDn).shouldRun();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
File dnDataDir = new File(
|
||||||
|
new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");
|
||||||
|
conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
|
||||||
|
Mockito.doReturn(conf).when(mockDn).getConf();
|
||||||
|
Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf();
|
||||||
|
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
|
||||||
|
when(mockDn).getMetrics();
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
if (count.getAndIncrement() == 0) {
|
||||||
|
throw new IOException("faked initBlockPool exception");
|
||||||
|
}
|
||||||
|
// The initBlockPool is called again. Now mock init is done.
|
||||||
|
Mockito.doReturn(mockFSDataset).when(mockDn).getFSDataset();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(mockDn).initBlockPool(Mockito.any(BPOfferService.class));
|
||||||
|
BPOfferService bpos = setupBPOSForNNs(mockDn, mockNN1, mockNN2);
|
||||||
|
bpos.start();
|
||||||
|
try {
|
||||||
|
waitForInitialization(bpos);
|
||||||
|
List<BPServiceActor> actors = bpos.getBPServiceActors();
|
||||||
|
assertEquals(1, actors.size());
|
||||||
|
BPServiceActor actor = actors.get(0);
|
||||||
|
waitForBlockReport(actor.getNameNodeProxy());
|
||||||
|
} finally {
|
||||||
|
bpos.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForOneToFail(final BPOfferService bpos)
|
private void waitForOneToFail(final BPOfferService bpos)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@ -309,6 +352,11 @@ public class TestBPOfferService {
|
||||||
*/
|
*/
|
||||||
private BPOfferService setupBPOSForNNs(
|
private BPOfferService setupBPOSForNNs(
|
||||||
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
|
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
|
||||||
|
return setupBPOSForNNs(mockDn, nns);
|
||||||
|
}
|
||||||
|
|
||||||
|
private BPOfferService setupBPOSForNNs(DataNode mockDn,
|
||||||
|
DatanodeProtocolClientSideTranslatorPB ... nns) throws IOException {
|
||||||
// Set up some fake InetAddresses, then override the connectToNN
|
// Set up some fake InetAddresses, then override the connectToNN
|
||||||
// function to return the corresponding proxies.
|
// function to return the corresponding proxies.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue