HDFS-5869. When starting rolling upgrade or NN restarts, NN should create a checkpoint right before the upgrade marker.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1565516 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-07 02:08:05 +00:00
parent b2ce764093
commit c780454413
15 changed files with 155 additions and 58 deletions

View File

@ -22,3 +22,6 @@ HDFS-5535 subtasks:
HDFS-5848. Add rolling upgrade status to heartbeat response. (szetszwo)
HDFS-5890. Avoid NPE in Datanode heartbeat. (Vinay via brandonli)
HDFS-5869. When starting rolling upgrade or NN restarts, NN should create
a checkpoint right before the upgrade marker. (szetszwo)

View File

@ -28,9 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RollingUpgradeInfo extends RollingUpgradeStatus {
public static final RollingUpgradeInfo EMPTY_INFO = new RollingUpgradeInfo(
null, 0);
private long startTime;
private long finalizeTime;

View File

@ -690,9 +690,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
try {
final RollingUpgradeInfo info = server.rollingUpgrade(
PBHelper.convert(req.getAction()));
return RollingUpgradeResponseProto.newBuilder()
.setRollingUpgradeInfo(PBHelper.convert(info))
.build();
final RollingUpgradeResponseProto.Builder b = RollingUpgradeResponseProto.newBuilder();
if (info != null) {
b.setRollingUpgradeInfo(PBHelper.convert(info));
}
return b.build();
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -645,7 +645,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setAction(PBHelper.convert(action)).build();
try {
final RollingUpgradeResponseProto proto = rpcProxy.rollingUpgrade(null, r);
return PBHelper.convert(proto.getRollingUpgradeInfo());
if (proto.hasRollingUpgradeInfo()) {
return PBHelper.convert(proto.getRollingUpgradeInfo());
}
return null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -156,6 +156,16 @@ public final class HdfsServerConstants {
public void setInteractiveFormat(boolean interactive) {
isInteractiveFormat = interactive;
}
@Override
public String toString() {
if (this == ROLLINGUPGRADE) {
return new StringBuilder(super.toString())
.append("(").append(getRollingUpgradeStartupOption()).append(")")
.toString();
}
return super.toString();
}
}
// Timeouts for communicating with DataNode for streaming writes/reads

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
/**
* Common class for storage information.
@ -95,7 +94,6 @@ public class StorageInfo {
public long getCTime() { return cTime; }
public void setStorageInfo(StorageInfo from) {
Preconditions.checkArgument(from.storageType == storageType);
layoutVersion = from.layoutVersion;
clusterID = from.clusterID;
namespaceID = from.namespaceID;

View File

@ -97,9 +97,12 @@ import com.google.common.base.Preconditions;
@InterfaceStability.Evolving
public class FSEditLogLoader {
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
private final FSNamesystem fsNamesys;
private long lastAppliedTxId;
/** Total number of end transactions loaded. */
private int totalEdits = 0;
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
this.fsNamesys = fsNamesys;
@ -212,6 +215,10 @@ public class FSEditLogLoader {
}
}
try {
if (LOG.isTraceEnabled()) {
LOG.trace("op=" + op + ", startOpt=" + startOpt
+ ", numEdits=" + numEdits + ", totalEdits=" + totalEdits);
}
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(), lastInodeId);
if (lastInodeId < inodeId) {
@ -250,6 +257,7 @@ public class FSEditLogLoader {
}
}
numEdits++;
totalEdits++;
} catch (UpgradeMarkerException e) {
LOG.info("Stopped at upgrade marker");
break;
@ -671,15 +679,19 @@ public class FSEditLogLoader {
}
case OP_UPGRADE_MARKER: {
if (startOpt == StartupOption.ROLLINGUPGRADE) {
if (startOpt.getRollingUpgradeStartupOption()
== RollingUpgradeStartupOption.ROLLBACK) {
final RollingUpgradeStartupOption rollingUpgradeOpt
= startOpt.getRollingUpgradeStartupOption();
if (rollingUpgradeOpt == RollingUpgradeStartupOption.ROLLBACK) {
throw new UpgradeMarkerException();
} else if (startOpt.getRollingUpgradeStartupOption()
== RollingUpgradeStartupOption.DOWNGRADE) {
} else if (rollingUpgradeOpt == RollingUpgradeStartupOption.DOWNGRADE) {
//ignore upgrade marker
break;
} else if (startOpt.getRollingUpgradeStartupOption()
== RollingUpgradeStartupOption.STARTED) {
} else if (rollingUpgradeOpt == RollingUpgradeStartupOption.STARTED) {
if (totalEdits > 1) {
// save namespace if this is not the second edit transaction
// (the first must be OP_START_LOG_SEGMENT)
fsNamesys.getFSImage().saveNamespace(fsNamesys);
}
//rolling upgrade is already started, set info
final UpgradeMarkerOp upgradeOp = (UpgradeMarkerOp)op;
fsNamesys.setRollingUpgradeInfo(upgradeOp.getStartTime());

View File

@ -912,6 +912,7 @@ public class FSImage implements Closeable {
public synchronized void saveNamespace(FSNamesystem source,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
LOG.info("Save namespace ...");
storage.attemptRestoreRemovedStorage();
boolean editLogWasOpen = editLog.isSegmentOpen();

View File

@ -204,7 +204,6 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@ -401,7 +400,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics;
private RollingUpgradeInfo rollingUpgradeInfo;
private RollingUpgradeInfo rollingUpgradeInfo = null;
// Block pool ID used by this namenode
private String blockPoolId;
@ -622,8 +621,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @return an FSNamesystem which contains the loaded namespace
* @throws IOException if loading fails
*/
public static FSNamesystem loadFromDisk(Configuration conf)
throws IOException {
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
checkConfiguration(conf);
FSImage fsImage = new FSImage(conf,
@ -636,10 +634,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
long loadStart = now();
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
try {
namesystem.loadFSImage(startOpt, fsImage,
HAUtil.isHAEnabled(conf, nameserviceId));
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
@ -864,8 +860,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return Collections.unmodifiableList(auditLoggers);
}
void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
throws IOException {
private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
// format before starting up if requested
if (startOpt == StartupOption.FORMAT) {
@ -878,8 +875,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
// We shouldn't be calling saveNamespace if we've come up in standby state.
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
boolean needToSave =
fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled;
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
LOG.info("Need to save fs image? " + needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
+ ", isRollingUpgrade=" + isRollingUpgrade() + ")");
if (needToSave) {
fsImage.saveNamespace(this);
} else {
@ -4494,6 +4495,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
checkRollingUpgrade("save namespace");
if (!isInSafeMode()) {
throw new IOException("Safe mode should be turned ON "
+ "in order to create namespace image.");
@ -5317,8 +5320,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NamenodeCommand cmd = null;
try {
checkOperation(OperationCategory.CHECKPOINT);
checkNameNodeSafeMode("Checkpoint not started");
LOG.info("Start checkpoint for " + backupNode.getAddress());
cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
getEditLog().logSync();
@ -7087,8 +7090,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.READ);
readLock();
try {
return rollingUpgradeInfo != null? rollingUpgradeInfo
: RollingUpgradeInfo.EMPTY_INFO;
return rollingUpgradeInfo;
} finally {
readUnlock();
}
@ -7100,18 +7102,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
final String err = "Failed to start rolling upgrade";
checkNameNodeSafeMode(err);
final String action = "start rolling upgrade";
checkNameNodeSafeMode("Failed to " + action);
checkRollingUpgrade(action);
getFSImage().saveNamespace(this);
LOG.info("Successfully saved namespace for preparing rolling upgrade.");
if (isRollingUpgrade()) {
throw new RollingUpgradeException(err
+ " since a rolling upgrade is already in progress."
+ "\nExisting rolling upgrade info: " + rollingUpgradeInfo);
}
final CheckpointSignature cs = getFSImage().rollEditLog();
LOG.info("Successfully rolled edit log for preparing rolling upgrade."
+ " Checkpoint signature: " + cs);
setRollingUpgradeInfo(now());
getEditLog().logUpgradeMarker(rollingUpgradeInfo.getStartTime());
} finally {
@ -7134,6 +7131,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return rollingUpgradeInfo != null;
}
private void checkRollingUpgrade(String action) throws RollingUpgradeException {
if (isRollingUpgrade()) {
throw new RollingUpgradeException("Failed to " + action
+ " since a rolling upgrade is already in progress."
+ " Existing rolling upgrade info:\n" + rollingUpgradeInfo);
}
}
RollingUpgradeInfo finalizeRollingUpgrade() throws IOException {
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);

View File

@ -1158,7 +1158,7 @@ public class NameNode implements NameNodeStatusMXBean {
}
private static void setStartupOption(Configuration conf, StartupOption opt) {
conf.set(DFS_NAMENODE_STARTUP_KEY, opt.toString());
conf.set(DFS_NAMENODE_STARTUP_KEY, opt.name());
}
static StartupOption getStartupOption(Configuration conf) {
@ -1188,7 +1188,7 @@ public class NameNode implements NameNodeStatusMXBean {
FSNamesystem fsn = null;
try {
fsn = FSNamesystem.loadFromDisk(conf);
fsn.saveNamespace();
fsn.getFSImage().saveNamespace(fsn);
MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
} catch (IOException e) {
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
@ -1204,6 +1204,7 @@ public class NameNode implements NameNodeStatusMXBean {
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
// Parse out some generic args into Configuration.

View File

@ -307,7 +307,7 @@ public class DFSAdmin extends FsShell {
final RollingUpgradeInfo info = dfs.rollingUpgrade(action);
switch(action){
case QUERY:
if (info.isStarted()) {
if (info != null && info.isStarted()) {
System.out.println("Rolling upgrade is in progress:");
System.out.println(info);
} else {

View File

@ -349,7 +349,7 @@ message RollingUpgradeInfoProto {
}
message RollingUpgradeResponseProto {
required RollingUpgradeInfoProto rollingUpgradeInfo= 1;
optional RollingUpgradeInfoProto rollingUpgradeInfo= 1;
}
message ListCorruptFileBlocksRequestProto {

View File

@ -1543,14 +1543,6 @@ public class MiniDFSCluster {
waitActive();
}
/**
* Restart the namenode.
*/
public synchronized void restartNameNode() throws IOException {
checkSingleNameNode();
restartNameNode(true);
}
/**
* Restart the namenode.
*/

View File

@ -26,10 +26,17 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.junit.Assert;
import org.junit.Test;
@ -111,6 +118,7 @@ public class TestRollingUpgrade {
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, dir.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
conf.setLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 0L);
return conf;
}
@ -120,6 +128,9 @@ public class TestRollingUpgrade {
final File nn1Dir = new File(nnDirPrefix + "image1");
final File nn2Dir = new File(nnDirPrefix + "image2");
LOG.info("nn1Dir=" + nn1Dir);
LOG.info("nn2Dir=" + nn2Dir);
final Configuration conf = new HdfsConfiguration();
final MiniJournalCluster mjc = new MiniJournalCluster.Builder(conf).build();
setConf(conf, nn1Dir, mjc);
@ -155,6 +166,7 @@ public class TestRollingUpgrade {
final Path foo = new Path("/foo");
final Path bar = new Path("/bar");
final Path baz = new Path("/baz");
final RollingUpgradeInfo info1;
{
@ -163,17 +175,35 @@ public class TestRollingUpgrade {
//start rolling upgrade
info1 = dfs.rollingUpgrade(RollingUpgradeAction.START);
LOG.info("start " + info1);
LOG.info("START\n" + info1);
//query rolling upgrade
Assert.assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY));
dfs.mkdirs(bar);
//save namespace should fail
try {
dfs.saveNamespace();
Assert.fail();
} catch(RemoteException re) {
Assert.assertEquals(RollingUpgradeException.class.getName(),
re.getClassName());
LOG.info("The exception is expected.", re);
}
//start checkpoint should fail
try {
NameNodeAdapter.startCheckpoint(cluster.getNameNode(), null, null);
Assert.fail();
} catch(RollingUpgradeException re) {
LOG.info("The exception is expected.", re);
}
}
// cluster2 takes over QJM
final Configuration conf2 = setConf(new Configuration(), nn2Dir, mjc);
//set startup option to downgrade for ignoring upgrade marker in editlog
// use "started" option for ignoring upgrade marker in editlog
StartupOption.ROLLINGUPGRADE.setRollingUpgradeStartupOption("started");
cluster2 = new MiniDFSCluster.Builder(conf2)
.numDataNodes(0)
@ -186,13 +216,50 @@ public class TestRollingUpgrade {
// Check that cluster2 sees the edits made on cluster1
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
Assert.assertFalse(dfs2.exists(baz));
//query rolling upgrade in cluster2
Assert.assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
LOG.info("finalize: " + dfs2.rollingUpgrade(RollingUpgradeAction.FINALIZE));
dfs2.mkdirs(baz);
LOG.info("RESTART cluster 2 with -rollingupgrade started");
cluster2.restartNameNode();
Assert.assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
Assert.assertTrue(dfs2.exists(baz));
LOG.info("RESTART cluster 2 with -rollingupgrade started again");
cluster2.restartNameNode();
Assert.assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
Assert.assertTrue(dfs2.exists(baz));
//finalize rolling upgrade
final RollingUpgradeInfo finalize = dfs2.rollingUpgrade(RollingUpgradeAction.FINALIZE);
LOG.info("FINALIZE: " + finalize);
Assert.assertEquals(info1.getStartTime(), finalize.getStartTime());
LOG.info("RESTART cluster 2 with regular startup option");
cluster2.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
cluster2.restartNameNode();
Assert.assertTrue(dfs2.exists(foo));
Assert.assertTrue(dfs2.exists(bar));
Assert.assertTrue(dfs2.exists(baz));
} finally {
if (cluster2 != null) cluster2.shutdown();
}
}
@Test(expected=ExitException.class)
public void testSecondaryNameNode() throws Exception {
ExitUtil.disableSystemExit();
final String[] args = {
StartupOption.ROLLINGUPGRADE.getName(),
RollingUpgradeStartupOption.STARTED.name()};
SecondaryNameNode.main(args);
}
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import java.io.File;
@ -41,6 +39,8 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
@ -238,5 +238,11 @@ public class NameNodeAdapter {
public static File getInProgressEditsFile(StorageDirectory sd, long startTxId) {
return NNStorage.getInProgressEditsFile(sd, startTxId);
}
public static NamenodeCommand startCheckpoint(NameNode nn,
NamenodeRegistration backupNode, NamenodeRegistration activeNamenode)
throws IOException {
return nn.getNamesystem().startCheckpoint(backupNode, activeNamenode);
}
}