HDFS-2920. fix remaining TODO items. Contributed by Aaron T. Myers and Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1294923 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-02-29 01:09:07 +00:00
parent c69dfdd5e1
commit 978a8050e2
24 changed files with 122 additions and 132 deletions

View File

@ -580,6 +580,12 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
* @param proxy the RPC proxy object to be stopped * @param proxy the RPC proxy object to be stopped
*/ */
public static void stopProxy(Object proxy) { public static void stopProxy(Object proxy) {
if (proxy instanceof ProtocolTranslator) {
RPC.stopProxy(((ProtocolTranslator)proxy)
.getUnderlyingProxyObject());
return;
}
InvocationHandler invocationHandler = null; InvocationHandler invocationHandler = null;
try { try {
invocationHandler = Proxy.getInvocationHandler(proxy); invocationHandler = Proxy.getInvocationHandler(proxy);

View File

@ -238,3 +238,5 @@ HDFS-3013. HA: NameNode format doesn't pick up dfs.namenode.name.dir.NameService
HDFS-3019. Fix silent failure of TestEditLogJournalFailures (todd) HDFS-3019. Fix silent failure of TestEditLogJournalFailures (todd)
HDFS-2958. Sweep for remaining proxy construction which doesn't go through failover path. (atm) HDFS-2958. Sweep for remaining proxy construction which doesn't go through failover path. (atm)
HDFS-2920. fix remaining TODO items. (atm and todd)

View File

@ -418,22 +418,9 @@ boolean renewLease() throws IOException {
/** /**
* Close connections the Namenode. * Close connections the Namenode.
* The namenode variable is either a rpcProxy passed by a test or
* created using the protocolTranslator which is closeable.
* If closeable then call close, else close using RPC.stopProxy().
*/ */
void closeConnectionToNamenode() { void closeConnectionToNamenode() {
if (namenode instanceof Closeable) { RPC.stopProxy(namenode);
try {
((Closeable) namenode).close();
return;
} catch (IOException e) {
// fall through - lets try the stopProxy
LOG.warn("Exception closing namenode, stopping the proxy");
}
} else {
RPC.stopProxy(namenode);
}
} }
/** Abort and release resources held. Ignore all errors. */ /** Abort and release resources held. Ignore all errors. */

View File

@ -694,7 +694,6 @@ public boolean restoreFailedStorage(String arg)
* *
* @throws IOException * @throws IOException
*/ */
//TODO(HA): Should this be @Idempotent?
public void finalizeUpgrade() throws IOException; public void finalizeUpgrade() throws IOException;
/** /**
@ -704,7 +703,6 @@ public boolean restoreFailedStorage(String arg)
* @return upgrade status information or null if no upgrades are in progress * @return upgrade status information or null if no upgrades are in progress
* @throws IOException * @throws IOException
*/ */
//TODO(HA): Should this be @Idempotent?
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
throws IOException; throws IOException;
@ -737,7 +735,7 @@ public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
* @param bandwidth Blanacer bandwidth in bytes per second for this datanode. * @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
* @throws IOException * @throws IOException
*/ */
//TODO(HA): Should this be @Idempotent? @Idempotent
public void setBalancerBandwidth(long bandwidth) throws IOException; public void setBalancerBandwidth(long bandwidth) throws IOException;
/** /**

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
@ -63,7 +64,8 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Stable @InterfaceStability.Stable
public class ClientDatanodeProtocolTranslatorPB implements public class ClientDatanodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientDatanodeProtocol, Closeable { ProtocolMetaInterface, ClientDatanodeProtocol,
ProtocolTranslator, Closeable {
public static final Log LOG = LogFactory public static final Log LOG = LogFactory
.getLog(ClientDatanodeProtocolTranslatorPB.class); .getLog(ClientDatanodeProtocolTranslatorPB.class);
@ -211,4 +213,9 @@ public boolean isMethodSupported(String methodName) throws IOException {
ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName); RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName);
} }
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
} }

View File

@ -19,7 +19,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;

View File

@ -383,7 +383,6 @@ synchronized void shutdownActor(BPServiceActor actor) {
bpServices.remove(actor); bpServices.remove(actor);
// TODO: synchronization should be a little better here
if (bpServices.isEmpty()) { if (bpServices.isEmpty()) {
dn.shutdownBlockPool(this); dn.shutdownBlockPool(this);
@ -392,12 +391,6 @@ synchronized void shutdownActor(BPServiceActor actor) {
} }
} }
@Deprecated
synchronized InetSocketAddress getNNSocketAddress() {
// TODO(HA) this doesn't make sense anymore
return bpServiceToActive.getNNSocketAddress();
}
/** /**
* Called by the DN to report an error to the NNs. * Called by the DN to report an error to the NNs.
*/ */
@ -432,11 +425,9 @@ void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
} }
/** /**
* TODO: this is still used in a few places where we need to sort out * @return a proxy to the active NN, or null if the BPOS has not
* what to do in HA! * acknowledged any NN as active yet.
* @return a proxy to the active NN
*/ */
@Deprecated
synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() { synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
if (bpServiceToActive != null) { if (bpServiceToActive != null) {
return bpServiceToActive.bpNamenode; return bpServiceToActive.bpNamenode;
@ -596,6 +587,7 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
break; break;
case DatanodeProtocol.DNA_SHUTDOWN: case DatanodeProtocol.DNA_SHUTDOWN:
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
// See HDFS-2987.
throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN"); throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
case DatanodeProtocol.DNA_REGISTER: case DatanodeProtocol.DNA_REGISTER:
// namenode requested a registration - at start or if NN lost contact // namenode requested a registration - at start or if NN lost contact

View File

@ -538,8 +538,8 @@ private void offerService() throws Exception {
DatanodeCommand cmd = blockReport(); DatanodeCommand cmd = blockReport();
processCommand(new DatanodeCommand[]{ cmd }); processCommand(new DatanodeCommand[]{ cmd });
// Now safe to start scanning the block pool // Now safe to start scanning the block pool.
// TODO(HA): this doesn't seem quite right // If it has already been started, this is a no-op.
if (dn.blockScanner != null) { if (dn.blockScanner != null) {
dn.blockScanner.addBlockPool(bpos.getBlockPoolId()); dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
} }

View File

@ -86,16 +86,6 @@ synchronized BPOfferService get(String bpid) {
return bpByBlockPoolId.get(bpid); return bpByBlockPoolId.get(bpid);
} }
// TODO(HA) would be good to kill this
synchronized BPOfferService get(InetSocketAddress addr) {
for (BPOfferService bpos : offerServices) {
if (bpos.containsNN(addr)) {
return bpos;
}
}
return null;
}
synchronized void remove(BPOfferService t) { synchronized void remove(BPOfferService t) {
offerServices.remove(t); offerServices.remove(t);
bpByBlockPoolId.remove(t.getBlockPoolId()); bpByBlockPoolId.remove(t.getBlockPoolId());

View File

@ -565,6 +565,23 @@ public void reportRemoteBadBlock(DatanodeInfo srcDataNode, ExtendedBlock block)
bpos.reportRemoteBadBlock(srcDataNode, block); bpos.reportRemoteBadBlock(srcDataNode, block);
} }
/**
* Try to send an error report to the NNs associated with the given
* block pool.
* @param bpid the block pool ID
* @param errCode error code to send
* @param errMsg textual message to send
*/
void trySendErrorReport(String bpid, int errCode, String errMsg) {
BPOfferService bpos = blockPoolManager.get(bpid);
if (bpos == null) {
throw new IllegalArgumentException("Bad block pool: " + bpid);
}
bpos.trySendErrorReport(errCode, errMsg);
}
/** /**
* Return the BPOfferService instance corresponding to the given block. * Return the BPOfferService instance corresponding to the given block.
* @param block * @param block
@ -874,7 +891,7 @@ DatanodeRegistration getDNRegistrationByMachineName(String mName) {
// TODO: all the BPs should have the same name as each other, they all come // TODO: all the BPs should have the same name as each other, they all come
// from getName() here! and the use cases only are in tests where they just // from getName() here! and the use cases only are in tests where they just
// call with getName(). So we could probably just make this method return // call with getName(). So we could probably just make this method return
// the first BPOS's registration // the first BPOS's registration. See HDFS-2609.
BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads(); BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
for (BPOfferService bpos : bposArray) { for (BPOfferService bpos : bposArray) {
if(bpos.bpRegistration.getName().equals(mName)) if(bpos.bpRegistration.getName().equals(mName))
@ -921,22 +938,6 @@ public InterDatanodeProtocol run() throws IOException {
} }
} }
/**
* get the name node address based on the block pool id
* @param bpid block pool ID
* @return namenode address corresponding to the bpid
*/
public InetSocketAddress getNameNodeAddr(String bpid) {
// TODO(HA) this function doesn't make sense! used by upgrade code
// Should it return just the active one or simply return the BPService.
BPOfferService bp = blockPoolManager.get(bpid);
if (bp != null) {
return bp.getNNSocketAddress();
}
LOG.warn("No name node address found for block pool ID " + bpid);
return null;
}
public InetSocketAddress getSelfAddr() { public InetSocketAddress getSelfAddr() {
return selfAddr; return selfAddr;
} }
@ -1869,7 +1870,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
* @return Namenode corresponding to the bpid * @return Namenode corresponding to the bpid
* @throws IOException * @throws IOException
*/ */
public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid) public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid)
throws IOException { throws IOException {
BPOfferService bpos = blockPoolManager.get(bpid); BPOfferService bpos = blockPoolManager.get(bpid);
if (bpos == null) { if (bpos == null) {
@ -1888,9 +1889,13 @@ public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
void syncBlock(RecoveringBlock rBlock, void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException { List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock(); ExtendedBlock block = rBlock.getBlock();
DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block DatanodeProtocolClientSideTranslatorPB nn =
.getBlockPoolId()); getActiveNamenodeForBP(block.getBlockPoolId());
assert nn != null; if (nn == null) {
throw new IOException(
"Unable to synchronize block " + rBlock + ", since this DN "
+ " has not acknowledged any NN as active.");
}
long recoveryId = rBlock.getNewGenerationStamp(); long recoveryId = rBlock.getNewGenerationStamp();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -2111,14 +2116,19 @@ public int getInfoPort(){
/** /**
* Returned information is a JSON representation of a map with * Returned information is a JSON representation of a map with
* name node host name as the key and block pool Id as the value * name node host name as the key and block pool Id as the value.
* Note that, if there are multiple NNs in an NA nameservice,
* a given block pool may be represented twice.
*/ */
@Override // DataNodeMXBean @Override // DataNodeMXBean
public String getNamenodeAddresses() { public String getNamenodeAddresses() {
final Map<String, String> info = new HashMap<String, String>(); final Map<String, String> info = new HashMap<String, String>();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) { if (bpos != null) {
info.put(bpos.getNNSocketAddress().getHostName(), bpos.getBlockPoolId()); for (BPServiceActor actor : bpos.getBPServiceActors()) {
info.put(actor.getNNSocketAddress().getHostName(),
bpos.getBlockPoolId());
}
} }
} }
return JSON.toString(info); return JSON.toString(info);
@ -2167,11 +2177,18 @@ public void deleteBlockPool(String blockPoolId, boolean force)
/** /**
* @param addr rpc address of the namenode * @param addr rpc address of the namenode
* @return true - if BPOfferService corresponding to the namenode is alive * @return true if the datanode is connected to a NameNode at the
* given address
*/ */
public boolean isBPServiceAlive(InetSocketAddress addr) { public boolean isConnectedToNN(InetSocketAddress addr) {
BPOfferService bp = blockPoolManager.get(addr); for (BPOfferService bpos : getAllBpOs()) {
return bp != null ? bp.isAlive() : false; for (BPServiceActor bpsa : bpos.getBPServiceActors()) {
if (addr.equals(bpsa.getNNSocketAddress())) {
return bpsa.isAlive();
}
}
}
return false;
} }
/** /**

View File

@ -92,7 +92,7 @@ public synchronized boolean startUpgrade() throws IOException {
"UpgradeManagerDatanode.currentUpgrades is not null."; "UpgradeManagerDatanode.currentUpgrades is not null.";
assert upgradeDaemon == null : assert upgradeDaemon == null :
"UpgradeManagerDatanode.upgradeDaemon is not null."; "UpgradeManagerDatanode.upgradeDaemon is not null.";
DatanodeProtocol nn = dataNode.getBPNamenode(bpid); DatanodeProtocol nn = dataNode.getActiveNamenodeForBP(bpid);
nn.processUpgradeCommand(broadcastCommand); nn.processUpgradeCommand(broadcastCommand);
return true; return true;
} }

View File

@ -45,7 +45,7 @@ protected DataNode getDatanode() {
} }
protected DatanodeProtocol getNamenode() throws IOException { protected DatanodeProtocol getNamenode() throws IOException {
return dataNode.getBPNamenode(bpid); return dataNode.getActiveNamenodeForBP(bpid);
} }
void setDatanode(DataNode dataNode, String bpid) { void setDatanode(DataNode dataNode, String bpid) {
@ -92,14 +92,7 @@ boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
+ " Name-node version = " + nsInfo.getLayoutVersion() + "."; + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
DataNode.LOG.fatal( errorMsg ); DataNode.LOG.fatal( errorMsg );
String bpid = nsInfo.getBlockPoolID(); String bpid = nsInfo.getBlockPoolID();
DatanodeProtocol nn = dataNode.getBPNamenode(bpid); dataNode.trySendErrorReport(bpid, DatanodeProtocol.NOTIFY, errorMsg);
try {
nn.errorReport(dataNode.getDNRegistrationForBP(bpid),
DatanodeProtocol.NOTIFY, errorMsg);
} catch(SocketTimeoutException e) { // namenode is busy
DataNode.LOG.info("Problem connecting to server: "
+ dataNode.getNameNodeAddr(nsInfo.getBlockPoolID()));
}
throw new IOException(errorMsg); throw new IOException(errorMsg);
} }

View File

@ -282,18 +282,13 @@ INode unprotectedAddFile( String path,
newNode = new INodeFile(permissions, 0, replication, newNode = new INodeFile(permissions, 0, replication,
modificationTime, atime, preferredBlockSize); modificationTime, atime, preferredBlockSize);
} }
writeLock(); // TODO: this is silly, considering the assert above!
try {
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
} catch (IOException e) {
return null;
}
return newNode;
} finally {
writeUnlock();
}
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
} catch (IOException e) {
return null;
}
return newNode;
} }
INodeDirectory addToParent(byte[] src, INodeDirectory parentINode, INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,

View File

@ -266,8 +266,8 @@ private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
// Now close the file // Now close the file
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile; INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
// TODO: we could use removeLease(holder, path) here, but OP_CLOSE // One might expect that you could use removeLease(holder, path) here,
// doesn't seem to serialize the holder... unclear why! // but OP_CLOSE doesn't serialize the holder. So, remove by path.
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path); fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.convertToInodeFile(); INodeFile newFile = ucFile.convertToInodeFile();
fsDir.replaceNode(addCloseOp.path, ucFile, newFile); fsDir.replaceNode(addCloseOp.path, ucFile, newFile);

View File

@ -226,7 +226,6 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
} }
} }
// TODO(HA): Have to figure out a story for the first 3 of these.
// 3. Do transitions // 3. Do transitions
switch(startOpt) { switch(startOpt) {
case UPGRADE: case UPGRADE:
@ -261,7 +260,6 @@ private boolean recoverStorageDirs(StartupOption startOpt,
StorageState curState; StorageState curState;
try { try {
curState = sd.analyzeStorage(startOpt, storage); curState = sd.analyzeStorage(startOpt, storage);
// TODO(HA): Fix this.
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) { if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) {
throw new IOException("Cannot start an HA namenode with name dirs " + throw new IOException("Cannot start an HA namenode with name dirs " +
@ -637,8 +635,6 @@ boolean loadFSImage(FSNamesystem target) throws IOException {
// update the txid for the edit log // update the txid for the edit log
editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1); editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
// TODO(HA): This should probably always return false when HA is enabled and
// we're coming up in standby state.
return needToSave; return needToSave;
} }
@ -697,8 +693,6 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
} finally { } finally {
FSEditLog.closeAllStreams(editStreams); FSEditLog.closeAllStreams(editStreams);
// update the counts // update the counts
// TODO(HA): this may be very slow -- we probably want to
// update them as we go for HA.
target.dir.updateCountForINodeWithQuota(); target.dir.updateCountForINodeWithQuota();
} }

View File

@ -533,7 +533,6 @@ void startActiveServices() throws IOException {
if (!editLog.isOpenForWrite()) { if (!editLog.isOpenForWrite()) {
// During startup, we're already open for write during initialization. // During startup, we're already open for write during initialization.
// TODO(HA): consider adding a startup state?
editLog.initJournalsForWrite(); editLog.initJournalsForWrite();
// May need to recover // May need to recover
editLog.recoverUnclosedStreams(); editLog.recoverUnclosedStreams();
@ -912,7 +911,6 @@ void close() {
} finally { } finally {
// using finally to ensure we also wait for lease daemon // using finally to ensure we also wait for lease daemon
try { try {
// TODO: these lines spew lots of warnings about "already stopped" logs, etc
stopActiveServices(); stopActiveServices();
stopStandbyServices(); stopStandbyServices();
if (dir != null) { if (dir != null) {

View File

@ -920,7 +920,7 @@ synchronized void monitorHealth()
if (!haEnabled) { if (!haEnabled) {
return; // no-op, if HA is not enabled return; // no-op, if HA is not enabled
} }
// TODO:HA implement health check // TODO(HA): implement health check
return; return;
} }
@ -963,7 +963,7 @@ synchronized boolean readyToBecomeActive()
/** /**
* Class used as expose {@link NameNode} as context to {@link HAState} * Class used as expose {@link NameNode} as context to {@link HAState}
* *
* TODO:HA * TODO(HA):
* When entering and exiting state, on failing to start services, * When entering and exiting state, on failing to start services,
* appropriate action is needed todo either shutdown the node or recover * appropriate action is needed todo either shutdown the node or recover
* from failure. * from failure.
@ -1005,7 +1005,6 @@ public void prepareToStopStandbyServices() throws ServiceFailedException {
@Override @Override
public void stopStandbyServices() throws IOException { public void stopStandbyServices() throws IOException {
// TODO(HA): Are we guaranteed to be the only active here?
if (namesystem != null) { if (namesystem != null) {
namesystem.stopStandbyServices(); namesystem.stopStandbyServices();
} }

View File

@ -176,7 +176,7 @@ private void doCheckpoint() throws InterruptedException, IOException {
public void cancelAndPreventCheckpoints() throws ServiceFailedException { public void cancelAndPreventCheckpoints() throws ServiceFailedException {
try { try {
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS); thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
// TODO: there is a really narrow race here if we are just // TODO(HA): there is a really narrow race here if we are just
// about to start a checkpoint - this won't cancel it! // about to start a checkpoint - this won't cancel it!
namesystem.getFSImage().cancelSaveNamespace( namesystem.getFSImage().cancelSaveNamespace(
"About to exit standby state"); "About to exit standby state");

View File

@ -1696,9 +1696,9 @@ private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
// If a datanode failed to start, then do not wait // If a datanode failed to start, then do not wait
for (DataNodeProperties dn : dataNodes) { for (DataNodeProperties dn : dataNodes) {
// the datanode thread communicating with the namenode should be alive // the datanode thread communicating with the namenode should be alive
if (!dn.datanode.isBPServiceAlive(addr)) { if (!dn.datanode.isConnectedToNN(addr)) {
LOG.warn("BPOfferService failed to start in datanode " + dn.datanode LOG.warn("BPOfferService in datanode " + dn.datanode
+ " for namenode at " + addr); + " failed to connect to namenode at " + addr);
return false; return false;
} }
} }

View File

@ -461,7 +461,7 @@ public void testZeroLenReplicas() throws IOException, InterruptedException {
initReplicaRecovery(any(RecoveringBlock.class)); initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks()); Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
d.join(); d.join();
DatanodeProtocol dnP = dn.getBPNamenode(POOL_ID); DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization( verify(dnP).commitBlockSynchronization(
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY); block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
} }
@ -518,7 +518,7 @@ public void testNoReplicaUnderRecovery() throws IOException {
} catch (IOException e) { } catch (IOException e) {
e.getMessage().startsWith("Cannot recover "); e.getMessage().startsWith("Cannot recover ");
} }
DatanodeProtocol namenode = dn.getBPNamenode(POOL_ID); DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization( verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class)); anyBoolean(), any(DatanodeID[].class));
@ -547,7 +547,7 @@ public void testNotMatchedReplicaID() throws IOException {
} catch (IOException e) { } catch (IOException e) {
e.getMessage().startsWith("Cannot recover "); e.getMessage().startsWith("Cannot recover ");
} }
DatanodeProtocol namenode = dn.getBPNamenode(POOL_ID); DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization( verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class)); anyBoolean(), any(DatanodeID[].class));

View File

@ -23,6 +23,8 @@
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -99,15 +101,15 @@ public void test2NNRegistration() throws IOException {
BPOfferService bpos2 = dn.getAllBpOs()[1]; BPOfferService bpos2 = dn.getAllBpOs()[1];
// The order of bpos is not guaranteed, so fix the order // The order of bpos is not guaranteed, so fix the order
if (bpos1.getNNSocketAddress().equals(nn2.getNameNodeAddress())) { if (getNNSocketAddress(bpos1).equals(nn2.getNameNodeAddress())) {
BPOfferService tmp = bpos1; BPOfferService tmp = bpos1;
bpos1 = bpos2; bpos1 = bpos2;
bpos2 = tmp; bpos2 = tmp;
} }
assertEquals("wrong nn address", bpos1.getNNSocketAddress(), assertEquals("wrong nn address", getNNSocketAddress(bpos1),
nn1.getNameNodeAddress()); nn1.getNameNodeAddress());
assertEquals("wrong nn address", bpos2.getNNSocketAddress(), assertEquals("wrong nn address", getNNSocketAddress(bpos2),
nn2.getNameNodeAddress()); nn2.getNameNodeAddress());
assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2); assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
@ -122,6 +124,12 @@ public void test2NNRegistration() throws IOException {
} }
} }
private static InetSocketAddress getNNSocketAddress(BPOfferService bpos) {
List<BPServiceActor> actors = bpos.getBPServiceActors();
assertEquals(1, actors.size());
return actors.get(0).getNNSocketAddress();
}
/** /**
* starts single nn and single dn and verifies registration and handshake * starts single nn and single dn and verifies registration and handshake
* *
@ -154,14 +162,16 @@ public void testFedSingleNN() throws IOException {
for (BPOfferService bpos : dn.getAllBpOs()) { for (BPOfferService bpos : dn.getAllBpOs()) {
LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid=" LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name + "; sid="
+ bpos.bpRegistration.storageID + "; nna=" + bpos.getNNSocketAddress()); + bpos.bpRegistration.storageID + "; nna=" +
getNNSocketAddress(bpos));
} }
// try block report // try block report
BPOfferService bpos1 = dn.getAllBpOs()[0]; BPOfferService bpos1 = dn.getAllBpOs()[0];
bpos1.triggerBlockReportForTests(); bpos1.triggerBlockReportForTests();
assertEquals("wrong nn address", bpos1.getNNSocketAddress(), assertEquals("wrong nn address",
getNNSocketAddress(bpos1),
nn1.getNameNodeAddress()); nn1.getNameNodeAddress());
assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
assertEquals("wrong cid", dn.getClusterId(), cid1); assertEquals("wrong cid", dn.getClusterId(), cid1);

View File

@ -22,15 +22,18 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
/** /**
* Tests datanode refresh namenode list functionality. * Tests datanode refresh namenode list functionality.
*/ */
@ -65,21 +68,24 @@ public void testRefreshNamenodes() throws IOException {
cluster.addNameNode(conf, nnPort4); cluster.addNameNode(conf, nnPort4);
BPOfferService[] bpoList = dn.getAllBpOs();
// Ensure a BPOfferService in the datanodes corresponds to // Ensure a BPOfferService in the datanodes corresponds to
// a namenode in the cluster // a namenode in the cluster
Set<InetSocketAddress> nnAddrsFromCluster = Sets.newHashSet();
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
InetSocketAddress addr = cluster.getNameNode(i).getNameNodeAddress(); assertTrue(nnAddrsFromCluster.add(
boolean found = false; cluster.getNameNode(i).getNameNodeAddress()));
for (int j = 0; j < bpoList.length; j++) {
if (bpoList[j] != null && addr.equals(bpoList[j].getNNSocketAddress())) {
found = true;
bpoList[j] = null; // Erase the address that matched
break;
}
}
assertTrue("NameNode address " + addr + " is not found.", found);
} }
Set<InetSocketAddress> nnAddrsFromDN = Sets.newHashSet();
for (BPOfferService bpos : dn.getAllBpOs()) {
for (BPServiceActor bpsa : bpos.getBPServiceActors()) {
assertTrue(nnAddrsFromDN.add(bpsa.getNNSocketAddress()));
}
}
assertEquals("",
Joiner.on(",").join(
Sets.symmetricDifference(nnAddrsFromCluster, nnAddrsFromDN)));
} finally { } finally {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();

View File

@ -179,7 +179,7 @@ public void testWriteOverFailoverWithDnFail() throws Exception {
// write another block and a half // write another block and a half
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF); AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
stm.hflush(); // TODO: see above stm.hflush();
LOG.info("Failing back to NN 0"); LOG.info("Failing back to NN 0");
cluster.transitionToStandby(0); cluster.transitionToStandby(0);
@ -188,7 +188,7 @@ public void testWriteOverFailoverWithDnFail() throws Exception {
cluster.stopDataNode(1); cluster.stopDataNode(1);
AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF); AppendTestUtil.write(stm, BLOCK_AND_A_HALF*2, BLOCK_AND_A_HALF);
stm.hflush(); // TODO: see above stm.hflush();
stm.close(); stm.close();

View File

@ -127,9 +127,6 @@ public void testBothNodesInStandbyState() throws Exception {
List<File> dirs = Lists.newArrayList(); List<File> dirs = Lists.newArrayList();
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0)); dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
// TODO: this failed once because it caught a ckpt file -- maybe
// this is possible if one of the NNs is really fast and the other is slow?
// need to loop this to suss out the race.
FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of()); FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.<String>of());
} }