HDFS-3438. BootstrapStandby should not require a rollEdits on active node. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1340343 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-19 05:13:38 +00:00
parent 20f935eaea
commit d1a54b872d
8 changed files with 97 additions and 84 deletions

View File

@ -190,6 +190,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3440. More effectively limit stream memory consumption when reading HDFS-3440. More effectively limit stream memory consumption when reading
corrupt edit logs (Colin Patrick McCabe via todd) corrupt edit logs (Colin Patrick McCabe via todd)
HDFS-3438. BootstrapStandby should not require a rollEdits on active node
(todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
@ -105,6 +107,20 @@ public class NamenodeProtocolServerSideTranslatorPB implements
return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build(); return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build();
} }
@Override
public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
throws ServiceException {
long txid;
try {
txid = impl.getMostRecentCheckpointTxId();
} catch (IOException e) {
throw new ServiceException(e);
}
return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
}
@Override @Override
public RollEditLogResponseProto rollEditLog(RpcController unused, public RollEditLogResponseProto rollEditLog(RpcController unused,
RollEditLogRequestProto request) throws ServiceException { RollEditLogRequestProto request) throws ServiceException {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportR
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
@ -119,6 +120,16 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
} }
} }
@Override
public long getMostRecentCheckpointTxId() throws IOException {
try {
return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override @Override
public CheckpointSignature rollEditLog() throws IOException { public CheckpointSignature rollEditLog() throws IOException {
try { try {

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -712,8 +711,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // NamenodeProtocol @Override // NamenodeProtocol
public long getTransactionID() throws IOException { public long getTransactionID() throws IOException {
namesystem.checkOperation(OperationCategory.CHECKPOINT); namesystem.checkOperation(OperationCategory.UNCHECKED);
return namesystem.getEditLog().getSyncTxId(); return namesystem.getFSImage().getLastAppliedOrWrittenTxId();
}
@Override // NamenodeProtocol
public long getMostRecentCheckpointTxId() throws IOException {
namesystem.checkOperation(OperationCategory.UNCHECKED);
return namesystem.getFSImage().getMostRecentCheckpointTxId();
} }
@Override // NamenodeProtocol @Override // NamenodeProtocol

View File

@ -33,16 +33,10 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -52,10 +46,8 @@ import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin; import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
@ -90,7 +82,7 @@ public class BootstrapStandby implements Tool, Configurable {
// Exit/return codes. // Exit/return codes.
static final int ERR_CODE_FAILED_CONNECT = 2; static final int ERR_CODE_FAILED_CONNECT = 2;
static final int ERR_CODE_INVALID_VERSION = 3; static final int ERR_CODE_INVALID_VERSION = 3;
static final int ERR_CODE_OTHER_NN_NOT_ACTIVE = 4; // Skip 4 - was used in previous versions, but no longer returned.
static final int ERR_CODE_ALREADY_FORMATTED = 5; static final int ERR_CODE_ALREADY_FORMATTED = 5;
static final int ERR_CODE_LOGS_UNAVAILABLE = 6; static final int ERR_CODE_LOGS_UNAVAILABLE = 6;
@ -142,12 +134,6 @@ public class BootstrapStandby implements Tool, Configurable {
.getProxy(); .getProxy();
} }
private HAServiceProtocol createHAProtocolProxy()
throws IOException {
return new NNHAServiceTarget(new HdfsConfiguration(conf), nsId, otherNNId)
.getProxy(conf, 15000);
}
private int doRun() throws IOException { private int doRun() throws IOException {
NamenodeProtocol proxy = createNNProtocolProxy(); NamenodeProtocol proxy = createNNProtocolProxy();
@ -184,29 +170,6 @@ public class BootstrapStandby implements Tool, Configurable {
" Layout version: " + nsInfo.getLayoutVersion() + "\n" + " Layout version: " + nsInfo.getLayoutVersion() + "\n" +
"====================================================="); "=====================================================");
// Ensure the other NN is active - we can't force it to roll edit logs
// below if it's not active.
if (!isOtherNNActive()) {
String err = "NameNode " + nsId + "." + nnId + " at " + otherIpcAddr +
" is not currently in ACTIVE state.";
if (!interactive) {
LOG.fatal(err + " Please transition it to " +
"active before attempting to bootstrap a standby node.");
return ERR_CODE_OTHER_NN_NOT_ACTIVE;
}
System.err.println(err);
if (ToolRunner.confirmPrompt(
"Do you want to automatically transition it to active now?")) {
transitionOtherNNActive();
} else {
LOG.fatal("User aborted. Exiting without bootstrapping standby.");
return ERR_CODE_OTHER_NN_NOT_ACTIVE;
}
}
// Check with the user before blowing away data. // Check with the user before blowing away data.
if (!NameNode.confirmFormat( if (!NameNode.confirmFormat(
Sets.union(Sets.newHashSet(dirsToFormat), Sets.union(Sets.newHashSet(dirsToFormat),
@ -215,11 +178,8 @@ public class BootstrapStandby implements Tool, Configurable {
return ERR_CODE_ALREADY_FORMATTED; return ERR_CODE_ALREADY_FORMATTED;
} }
// Force the active to roll its log long imageTxId = proxy.getMostRecentCheckpointTxId();
CheckpointSignature csig = proxy.rollEditLog(); long curTxId = proxy.getTransactionID();
long imageTxId = csig.getMostRecentCheckpointTxId();
long rollTxId = csig.getCurSegmentTxId();
// Format the storage (writes VERSION file) // Format the storage (writes VERSION file)
NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat); NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
@ -233,11 +193,11 @@ public class BootstrapStandby implements Tool, Configurable {
// Ensure that we have enough edits already in the shared directory to // Ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active. // start up from the last checkpoint on the active.
if (!checkLogsAvailableForRead(image, imageTxId, rollTxId)) { if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
return ERR_CODE_LOGS_UNAVAILABLE; return ERR_CODE_LOGS_UNAVAILABLE;
} }
image.getStorage().writeTransactionIdFileToStorage(rollTxId); image.getStorage().writeTransactionIdFileToStorage(curTxId);
// Download that checkpoint into our storage directories. // Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage( MD5Hash hash = TransferFsImage.downloadImageToStorage(
@ -248,31 +208,31 @@ public class BootstrapStandby implements Tool, Configurable {
} }
private void transitionOtherNNActive()
throws AccessControlException, ServiceFailedException, IOException {
LOG.info("Transitioning the running namenode to active...");
createHAProtocolProxy().transitionToActive();
LOG.info("Successful");
}
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId, private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
long rollTxId) { long curTxIdOnOtherNode) {
if (imageTxId == curTxIdOnOtherNode) {
// The other node hasn't written any logs since the last checkpoint.
// This can be the case if the NN was freshly formatted as HA, and
// then started in standby mode, so it has no edit logs at all.
return true;
}
long firstTxIdInLogs = imageTxId + 1; long firstTxIdInLogs = imageTxId + 1;
long lastTxIdInLogs = rollTxId - 1;
assert lastTxIdInLogs >= firstTxIdInLogs; assert curTxIdOnOtherNode >= firstTxIdInLogs :
"first=" + firstTxIdInLogs + " onOtherNode=" + curTxIdOnOtherNode;
try { try {
Collection<EditLogInputStream> streams = Collection<EditLogInputStream> streams =
image.getEditLog().selectInputStreams( image.getEditLog().selectInputStreams(
firstTxIdInLogs, lastTxIdInLogs, false); firstTxIdInLogs, curTxIdOnOtherNode, true);
for (EditLogInputStream stream : streams) { for (EditLogInputStream stream : streams) {
IOUtils.closeStream(stream); IOUtils.closeStream(stream);
} }
return true; return true;
} catch (IOException e) { } catch (IOException e) {
String msg = "Unable to read transaction ids " + String msg = "Unable to read transaction ids " +
firstTxIdInLogs + "-" + lastTxIdInLogs + firstTxIdInLogs + "-" + curTxIdOnOtherNode +
" from the configured shared edits storage " + " from the configured shared edits storage " +
Joiner.on(",").join(sharedEditsUris) + ". " + Joiner.on(",").join(sharedEditsUris) + ". " +
"Please copy these logs into the shared edits storage " + "Please copy these logs into the shared edits storage " +
@ -291,12 +251,6 @@ public class BootstrapStandby implements Tool, Configurable {
return (nsInfo.getLayoutVersion() == HdfsConstants.LAYOUT_VERSION); return (nsInfo.getLayoutVersion() == HdfsConstants.LAYOUT_VERSION);
} }
private boolean isOtherNNActive()
throws AccessControlException, IOException {
HAServiceStatus status = createHAProtocolProxy().getServiceStatus();
return status.getState() == HAServiceState.ACTIVE;
}
private void parseConfAndFindOtherNN() throws IOException { private void parseConfAndFindOtherNN() throws IOException {
Configuration conf = getConf(); Configuration conf = getConf();
nsId = DFSUtil.getNamenodeNameServiceId(conf); nsId = DFSUtil.getNamenodeNameServiceId(conf);

View File

@ -87,11 +87,17 @@ public interface NamenodeProtocol {
/** /**
* @return The most recent transaction ID that has been synced to * @return The most recent transaction ID that has been synced to
* persistent storage. * persistent storage, or applied from persistent storage in the
* case of a non-active node.
* @throws IOException * @throws IOException
*/ */
public long getTransactionID() throws IOException; public long getTransactionID() throws IOException;
/**
* Get the transaction ID of the most recent checkpoint.
*/
public long getMostRecentCheckpointTxId() throws IOException;
/** /**
* Closes the current edit log and opens a new one. The * Closes the current edit log and opens a new one. The
* call fails if the file system is in SafeMode. * call fails if the file system is in SafeMode.

View File

@ -84,6 +84,16 @@ message RollEditLogResponseProto {
required CheckpointSignatureProto signature = 1; required CheckpointSignatureProto signature = 1;
} }
/**
* void request
*/
message GetMostRecentCheckpointTxIdRequestProto {
}
message GetMostRecentCheckpointTxIdResponseProto{
required uint64 txId = 1;
}
/** /**
* registration - Namenode reporting the error * registration - Namenode reporting the error
* errorCode - error code indicating the error * errorCode - error code indicating the error
@ -188,13 +198,19 @@ service NamenodeProtocolService {
rpc getTransactionId(GetTransactionIdRequestProto) rpc getTransactionId(GetTransactionIdRequestProto)
returns(GetTransactionIdResponseProto); returns(GetTransactionIdResponseProto);
/**
* Get the transaction ID of the most recently persisted editlog record
*/
rpc getMostRecentCheckpointTxId(GetMostRecentCheckpointTxIdRequestProto)
returns(GetMostRecentCheckpointTxIdResponseProto);
/** /**
* Close the current editlog and open a new one for checkpointing purposes * Close the current editlog and open a new one for checkpointing purposes
*/ */
rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto); rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto);
/** /**
* Close the current editlog and open a new one for checkpointing purposes * Request info about the version running on this NameNode
*/ */
rpc versionRequest(VersionRequestProto) returns(VersionResponseProto); rpc versionRequest(VersionRequestProto) returns(VersionResponseProto);

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After; import org.junit.After;
@ -43,6 +45,7 @@ import org.junit.Test;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -177,7 +180,7 @@ public class TestBootstrapStandby {
logs.stopCapturing(); logs.stopCapturing();
} }
GenericTestUtils.assertMatches(logs.getOutput(), GenericTestUtils.assertMatches(logs.getOutput(),
"FATAL.*Unable to read transaction ids 1-4 from the configured shared"); "FATAL.*Unable to read transaction ids 1-3 from the configured shared");
} }
@Test @Test
@ -195,30 +198,29 @@ public class TestBootstrapStandby {
assertEquals(0, rc); assertEquals(0, rc);
} }
/**
* Test that, even if the other node is not active, we are able
* to bootstrap standby from it.
*/
@Test(timeout=30000) @Test(timeout=30000)
public void testOtherNodeNotActive() throws Exception { public void testOtherNodeNotActive() throws Exception {
cluster.transitionToStandby(0); cluster.transitionToStandby(0);
int rc = BootstrapStandby.run( int rc = BootstrapStandby.run(
new String[]{"-nonInteractive"},
cluster.getConfiguration(1));
assertEquals(BootstrapStandby.ERR_CODE_OTHER_NN_NOT_ACTIVE, rc);
// Answer "yes" to the prompt about transition to active
System.setIn(new ByteArrayInputStream("yes\n".getBytes()));
rc = BootstrapStandby.run(
new String[]{"-force"}, new String[]{"-force"},
cluster.getConfiguration(1)); cluster.getConfiguration(1));
assertEquals(0, rc); assertEquals(0, rc);
assertFalse(nn0.getNamesystem().isInStandbyState());
} }
private void assertNNFilesMatch() throws Exception { private void assertNNFilesMatch() throws Exception {
List<File> curDirs = Lists.newArrayList(); List<File> curDirs = Lists.newArrayList();
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0)); curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
// Ignore seen_txid file, since the newly bootstrapped standby
// will have a higher seen_txid than the one it bootstrapped from.
Set<String> ignoredFiles = ImmutableSet.of("seen_txid");
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs, FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
Collections.<String>emptySet()); ignoredFiles);
} }
private void removeStandbyNameDirs() { private void removeStandbyNameDirs() {