HDFS-6000. Avoid saving namespace when starting rolling upgrade. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1571840 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-02-25 21:58:53 +00:00
parent abc9a6dad5
commit e3d2e4c156
19 changed files with 317 additions and 131 deletions

View File

@ -97,3 +97,5 @@ HDFS-5535 subtasks:
HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal)
HDFS-5498. Improve datanode startup time. (kihwal)
HDFS-6000. Avoid saving namespace when starting rolling upgrade. (jing9)

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability;
public class RollingUpgradeInfo extends RollingUpgradeStatus {
private final long startTime;
private final long finalizeTime;
private final boolean createdRollbackImages;
private boolean createdRollbackImages;
public RollingUpgradeInfo(String blockPoolId, boolean createdRollbackImages,
long startTime, long finalizeTime) {
@ -44,6 +44,10 @@ public class RollingUpgradeInfo extends RollingUpgradeStatus {
return createdRollbackImages;
}
public void setCreatedRollbackImages(boolean created) {
this.createdRollbackImages = created;
}
public boolean isStarted() {
return startTime != 0;
}

View File

@ -93,6 +93,12 @@ public final class HdfsServerConstants {
== RollingUpgradeStartupOption.ROLLBACK;
}
public static boolean isRollingUpgradeDowngrade(StartupOption option) {
return option == ROLLINGUPGRADE
&& option.getRollingUpgradeStartupOption()
== RollingUpgradeStartupOption.DOWNGRADE;
}
private final String name;
// Used only with format and upgrade options

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -220,9 +221,9 @@ class Checkpointer extends Daemon {
LOG.info("Unable to roll forward using only logs. Downloading " +
"image with txid " + sig.mostRecentCheckpointTxId);
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId,
bnStorage, true);
bnImage.saveDigestAndRenameCheckpointImage(
backupNode.nnHttpAddress, NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, bnStorage, true);
bnImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
lastApplied = sig.mostRecentCheckpointTxId;
needReloadImage = true;
@ -240,7 +241,8 @@ class Checkpointer extends Daemon {
if(needReloadImage) {
LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
File file = bnStorage.findImageFile(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId);
bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
}
rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
@ -263,7 +265,7 @@ class Checkpointer extends Daemon {
if(cpCmd.needToReturnImage()) {
TransferFsImage.uploadImageFromStorage(
backupNode.nnHttpAddress, getImageListenAddress(),
bnStorage, txid);
bnStorage, NameNodeFile.IMAGE, txid);
}
getRemoteNamenodeProxy().endCheckpoint(backupNode.getRegistration(), sig);

View File

@ -727,10 +727,10 @@ public class FSEditLogLoader {
break;
}
}
// save namespace if there is no rollback image existing
// start rolling upgrade
final long startTime = ((RollingUpgradeOp) op).getTime();
fsNamesys.startRollingUpgradeInternal(startTime, op.txid - 2);
fsNamesys.startRollingUpgradeInternal(startTime);
fsNamesys.triggerRollbackCheckpoint();
break;
}
case OP_ROLLING_UPGRADE_FINALIZE: {

View File

@ -21,11 +21,13 @@ import static org.apache.hadoop.util.Time.now;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -328,18 +330,19 @@ public class FSImage implements Closeable {
}
/**
* @return true if there is rollback fsimage (for rolling upgrade) for the
* given txid in storage.
* @return true if there is rollback fsimage (for rolling upgrade) in NameNode
* directory.
*/
boolean hasRollbackFSImage(long txid) {
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
final File rollbackImageFile = NNStorage.getStorageFile(sd,
NameNodeFile.IMAGE_ROLLBACK, txid);
if (rollbackImageFile.exists()) {
return true;
}
public boolean hasRollbackFSImage() throws IOException {
final FSImageStorageInspector inspector = new FSImageTransactionalStorageInspector(
EnumSet.of(NameNodeFile.IMAGE_ROLLBACK));
storage.inspectStorageDirs(inspector);
try {
List<FSImageFile> images = inspector.getLatestImages();
return images != null && !images.isEmpty();
} catch (FileNotFoundException e) {
return false;
}
return false;
}
void doUpgrade(FSNamesystem target) throws IOException {
@ -566,9 +569,15 @@ public class FSImage implements Closeable {
throws IOException {
final boolean rollingRollback = StartupOption
.isRollingUpgradeRollback(startOpt);
final NameNodeFile nnf = rollingRollback ? NameNodeFile.IMAGE_ROLLBACK
: NameNodeFile.IMAGE;
final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnf);
final EnumSet<NameNodeFile> nnfs;
if (rollingRollback) {
// if it is rollback of rolling upgrade, only load from the rollback image
nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
} else {
// otherwise we can load from both IMAGE and IMAGE_ROLLBACK
nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
}
final FSImageStorageInspector inspector = storage.readAndInspectDirs(nnfs);
isUpgradeFinalized = inspector.isUpgradeFinalized();
List<FSImageFile> imageFiles = inspector.getLatestImages();
@ -643,6 +652,10 @@ public class FSImage implements Closeable {
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
if (StartupOption.isRollingUpgradeDowngrade(startOpt)) {
// purge rollback image if it is downgrade
archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
}
} else {
// Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
// to the last txid in rollback fsimage.
@ -973,10 +986,10 @@ public class FSImage implements Closeable {
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
* @param canceler
* @param canceler
*/
public synchronized void saveNamespace(FSNamesystem source,
NameNodeFile nnf, Canceler canceler) throws IOException {
public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
LOG.info("Save namespace ...");
storage.attemptRestoreRemovedStorage();
@ -1222,13 +1235,13 @@ public class FSImage implements Closeable {
* renames the image from fsimage_N.ckpt to fsimage_N and also
* saves the related .md5 file into place.
*/
public synchronized void saveDigestAndRenameCheckpointImage(
public synchronized void saveDigestAndRenameCheckpointImage(NameNodeFile nnf,
long txid, MD5Hash digest) throws IOException {
// Write and rename MD5 file
List<StorageDirectory> badSds = Lists.newArrayList();
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
File imageFile = NNStorage.getImageFile(sd, txid);
File imageFile = NNStorage.getImageFile(sd, nnf, txid);
try {
MD5FileUtils.saveMD5File(imageFile, digest);
} catch (IOException ioe) {
@ -1240,7 +1253,7 @@ public class FSImage implements Closeable {
CheckpointFaultInjector.getInstance().afterMD5Rename();
// Rename image from tmp file
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, NameNodeFile.IMAGE, false);
renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
// So long as this is the newest image available,
// advertise it as such to other checkpointers
// from now on

View File

@ -280,7 +280,10 @@ public final class FSImageFormatProtobuf {
fsn.setGenerationStampV1Limit(s.getGenstampV1Limit());
fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
imgTxId = s.getTransactionId();
if (s.hasRollingUpgradeStartTime()) {
if (s.hasRollingUpgradeStartTime()
&& fsn.getFSImage().hasRollbackFSImage()) {
// we set the rollingUpgradeInfo only when we make sure we have the
// rollback image
fsn.setRollingUpgradeInfo(true, s.getRollingUpgradeStartTime());
}
}

View File

@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
@ -33,7 +34,9 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
public static final Log LOG = LogFactory.getLog(
@ -45,14 +48,27 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
private long maxSeenTxId = 0;
private final Pattern namePattern;
private final List<Pattern> namePatterns = Lists.newArrayList();
FSImageTransactionalStorageInspector() {
this(NameNodeFile.IMAGE);
this(EnumSet.of(NameNodeFile.IMAGE));
}
FSImageTransactionalStorageInspector(NameNodeFile nnf) {
namePattern = Pattern.compile(nnf.getName() + "_(\\d+)");
FSImageTransactionalStorageInspector(EnumSet<NameNodeFile> nnfs) {
for (NameNodeFile nnf : nnfs) {
Pattern pattern = Pattern.compile(nnf.getName() + "_(\\d+)");
namePatterns.add(pattern);
}
}
private Matcher matchPattern(String name) {
for (Pattern p : namePatterns) {
Matcher m = p.matcher(name);
if (m.matches()) {
return m;
}
}
return null;
}
@Override
@ -88,8 +104,8 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
String name = f.getName();
// Check for fsimage_*
Matcher imageMatch = namePattern.matcher(name);
if (imageMatch.matches()) {
Matcher imageMatch = this.matchPattern(name);
if (imageMatch != null) {
if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
try {
long txid = Long.valueOf(imageMatch.group(1));

View File

@ -885,6 +885,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
if (StartupOption.isRollingUpgradeRollback(startOpt)) {
rollingUpgradeInfo = null;
}
final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
LOG.info("Need to save fs image? " + needToSave
+ " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
@ -1141,6 +1144,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
/**
* Called when the NN is in Standby state and the editlog tailer tails the
* OP_ROLLING_UPGRADE_START.
*/
void triggerRollbackCheckpoint() {
if (standbyCheckpointer != null) {
standbyCheckpointer.triggerRollbackCheckpoint();
}
}
/**
* Called while the NN is in Standby state, but just about to be
@ -7131,6 +7143,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.READ);
readLock();
try {
if (rollingUpgradeInfo != null) {
boolean hasRollbackImage = this.getFSImage().hasRollbackFSImage();
rollingUpgradeInfo.setCreatedRollbackImages(hasRollbackImage);
}
return rollingUpgradeInfo;
} finally {
readUnlock();
@ -7143,15 +7159,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Failed to start rolling upgrade");
startRollingUpgradeInternal(now(), -1);
long startTime = now();
if (!haEnabled) { // for non-HA, we require NN to be in safemode
startRollingUpgradeInternalForNonHA(startTime);
} else { // for HA, NN cannot be in safemode
checkNameNodeSafeMode("Failed to start rolling upgrade");
startRollingUpgradeInternal(startTime);
}
getEditLog().logStartRollingUpgrade(rollingUpgradeInfo.getStartTime());
if (haEnabled) {
// roll the edit log to make sure the standby NameNode can tail
getFSImage().rollEditLog();
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "startRollingUpgrade", null, null, null);
}
@ -7160,19 +7185,35 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* Update internal state to indicate that a rolling upgrade is in progress.
* Ootionally create a checkpoint before starting the RU.
* @param startTime
*/
void startRollingUpgradeInternal(long startTime, long txid)
void startRollingUpgradeInternal(long startTime)
throws IOException {
checkRollingUpgrade("start rolling upgrade");
getFSImage().checkUpgrade(this);
setRollingUpgradeInfo(false, startTime);
}
// if we have not made a rollback image, do it
if (txid < 0 || !getFSImage().hasRollbackFSImage(txid)) {
getFSImage().saveNamespace(this, NameNodeFile.IMAGE_ROLLBACK, null);
LOG.info("Successfully saved namespace for preparing rolling upgrade.");
/**
* Update internal state to indicate that a rolling upgrade is in progress for
* non-HA setup. This requires the namesystem is in SafeMode and after doing a
* checkpoint for rollback the namesystem will quit the safemode automatically
*/
private void startRollingUpgradeInternalForNonHA(long startTime)
throws IOException {
Preconditions.checkState(!haEnabled);
if (!isInSafeMode()) {
throw new IOException("Safe mode should be turned ON "
+ "in order to create namespace image.");
}
checkRollingUpgrade("start rolling upgrade");
getFSImage().checkUpgrade(this);
// in non-HA setup, we do an extra ckpt to generate a rollback image
getFSImage().saveNamespace(this, NameNodeFile.IMAGE_ROLLBACK, null);
LOG.info("Successfully saved namespace for preparing rolling upgrade.");
// leave SafeMode automatically
setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
setRollingUpgradeInfo(true, startTime);
}
@ -7181,7 +7222,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
createdRollbackImages, startTime, 0L);
}
RollingUpgradeInfo getRollingUpgradeInfo() {
public void setCreatedRollbackImages(boolean created) {
if (rollingUpgradeInfo != null) {
rollingUpgradeInfo.setCreatedRollbackImages(created);
}
}
public RollingUpgradeInfo getRollingUpgradeInfo() {
return rollingUpgradeInfo;
}
@ -7232,7 +7279,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final long startTime = rollingUpgradeInfo.getStartTime();
rollingUpgradeInfo = null;
return new RollingUpgradeInfo(blockPoolId, true, startTime, finalizeTime);
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
}
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)

View File

@ -78,7 +78,8 @@ public class GetImageServlet extends HttpServlet {
private static final String END_TXID_PARAM = "endTxId";
private static final String STORAGEINFO_PARAM = "storageInfo";
private static final String LATEST_FSIMAGE_VALUE = "latest";
private static final String IMAGE_FILE_TYPE = "imageFile";
private static Set<Long> currentlyDownloadingCheckpoints =
Collections.<Long>synchronizedSet(new HashSet<Long>());
@ -87,7 +88,7 @@ public class GetImageServlet extends HttpServlet {
final HttpServletResponse response
) throws ServletException, IOException {
try {
ServletContext context = getServletContext();
final ServletContext context = getServletContext();
final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
final GetImageParams parsedParams = new GetImageParams(request, response);
final Configuration conf = (Configuration) context
@ -121,13 +122,14 @@ public class GetImageServlet extends HttpServlet {
public Void run() throws Exception {
if (parsedParams.isGetImage()) {
long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
File imageFile = null;
String errorMessage = "Could not find image";
if (parsedParams.shouldFetchLatest()) {
imageFile = nnImage.getStorage().getHighestFsImageName();
} else {
errorMessage += " with txid " + txid;
imageFile = nnImage.getStorage().getFsImageName(txid);
imageFile = nnImage.getStorage().getFsImageName(txid, nnf);
}
if (imageFile == null) {
throw new IOException(errorMessage);
@ -155,6 +157,7 @@ public class GetImageServlet extends HttpServlet {
}
} else if (parsedParams.isPutImage()) {
final long txid = parsedParams.getTxId();
final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (! currentlyDownloadingCheckpoints.add(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT,
@ -164,7 +167,7 @@ public class GetImageServlet extends HttpServlet {
}
try {
if (nnImage.getStorage().findImageFile(txid) != null) {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " +
"for txid " + txid);
@ -178,11 +181,15 @@ public class GetImageServlet extends HttpServlet {
long start = now();
// issue a HTTP get request to download the new fsimage
MD5Hash downloadImageDigest =
TransferFsImage.downloadImageToStorage(
parsedParams.getInfoServer(conf), txid,
nnImage.getStorage(), true);
nnImage.saveDigestAndRenameCheckpointImage(txid, downloadImageDigest);
MD5Hash downloadImageDigest = TransferFsImage
.downloadImageToStorage(parsedParams.getInfoServer(conf),
nnf, txid, nnImage.getStorage(), true);
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
NameNodeHttpServer.getNameNodeFromContext(context)
.getNamesystem().setCreatedRollbackImages(true);
}
if (metrics != null) { // Metrics non-null only when used inside name node
long elapsed = now() - start;
@ -191,7 +198,7 @@ public class GetImageServlet extends HttpServlet {
// Now that we have a new checkpoint, we might be able to
// remove some old ones.
nnImage.purgeOldStorage(NameNodeFile.IMAGE);
nnImage.purgeOldStorage(nnf);
} finally {
currentlyDownloadingCheckpoints.remove(txid);
}
@ -315,9 +322,10 @@ public class GetImageServlet extends HttpServlet {
return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
}
static String getParamStringForImage(long txid,
static String getParamStringForImage(NameNodeFile nnf, long txid,
StorageInfo remoteStorageInfo) {
return "getimage=1&" + TXID_PARAM + "=" + txid
+ "&" + IMAGE_FILE_TYPE + "=" + nnf.name()
+ "&" + STORAGEINFO_PARAM + "=" +
remoteStorageInfo.toColonSeparatedString();
}
@ -330,7 +338,7 @@ public class GetImageServlet extends HttpServlet {
remoteStorageInfo.toColonSeparatedString();
}
static String getParamStringToPutImage(long txid,
static String getParamStringToPutImage(NameNodeFile nnf, long txid,
URL url, Storage storage) {
InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
.getAuthority());
@ -339,6 +347,7 @@ public class GetImageServlet extends HttpServlet {
: imageListenAddress.getHostName();
return "putimage=1" +
"&" + TXID_PARAM + "=" + txid +
"&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
"&port=" + imageListenAddress.getPort() +
(machine != null ? "&machine=" + machine : "")
+ "&" + STORAGEINFO_PARAM + "=" +
@ -352,6 +361,7 @@ public class GetImageServlet extends HttpServlet {
private boolean isPutImage;
private int remoteport;
private String machineName;
private NameNodeFile nnf;
private long startTxId, endTxId, txId;
private String storageInfoString;
private boolean fetchLatest;
@ -376,6 +386,9 @@ public class GetImageServlet extends HttpServlet {
isGetImage = true;
try {
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} catch (NumberFormatException nfe) {
if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
fetchLatest = true;
@ -390,6 +403,9 @@ public class GetImageServlet extends HttpServlet {
} else if (key.equals("putimage")) {
isPutImage = true;
txId = ServletUtil.parseLongParam(request, TXID_PARAM);
String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
.valueOf(imageType);
} else if (key.equals("port")) {
remoteport = new Integer(val[0]).intValue();
} else if (key.equals("machine")) {
@ -420,7 +436,12 @@ public class GetImageServlet extends HttpServlet {
Preconditions.checkState(isGetImage || isPutImage);
return txId;
}
public NameNodeFile getNameNodeFile() {
Preconditions.checkState(isPutImage || isGetImage);
return nnf;
}
public long getStartTxId() {
Preconditions.checkState(isGetEdit);
return startTxId;

View File

@ -25,6 +25,7 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -50,8 +51,8 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@ -499,21 +500,24 @@ public class NNStorage extends Storage implements Closeable,
}
/**
* Return the name of the image file.
* @return The name of the first image file.
* @return The first image file with the given txid and image type.
*/
public File getFsImageName(long txid) {
StorageDirectory sd = null;
for (Iterator<StorageDirectory> it =
dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
sd = it.next();
File fsImage = getStorageFile(sd, NameNodeFile.IMAGE, txid);
if(FileUtil.canRead(sd.getRoot()) && fsImage.exists())
public File getFsImageName(long txid, NameNodeFile nnf) {
for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
it.hasNext();) {
StorageDirectory sd = it.next();
File fsImage = getStorageFile(sd, nnf, txid);
if (FileUtil.canRead(sd.getRoot()) && fsImage.exists()) {
return fsImage;
}
}
return null;
}
public File getFsImageName(long txid) {
return getFsImageName(txid, NameNodeFile.IMAGE);
}
public File getHighestFsImageName() {
return getFsImageName(getMostRecentCheckpointTxId());
}
@ -697,12 +701,11 @@ public class NNStorage extends Storage implements Closeable,
return new File(sd.getCurrentDir(),
getTemporaryEditsFileName(startTxId, endTxId, timestamp));
}
static File getImageFile(StorageDirectory sd, long txid) {
return new File(sd.getCurrentDir(),
getImageFileName(txid));
static File getImageFile(StorageDirectory sd, NameNodeFile nnf, long txid) {
return new File(sd.getCurrentDir(), getNameNodeFileName(nnf, txid));
}
@VisibleForTesting
public static String getFinalizedEditsFileName(long startTxId, long endTxId) {
return String.format("%s_%019d-%019d", NameNodeFile.EDITS.getName(),
@ -730,12 +733,12 @@ public class NNStorage extends Storage implements Closeable,
}
/**
* Return the first readable image file for the given txid, or null
* if no such image can be found
* Return the first readable image file for the given txid and image type, or
* null if no such image can be found
*/
File findImageFile(long txid) {
File findImageFile(NameNodeFile nnf, long txid) {
return findFile(NameNodeDirType.IMAGE,
getImageFileName(txid));
getNameNodeFileName(nnf, txid));
}
/**
@ -980,7 +983,8 @@ public class NNStorage extends Storage implements Closeable,
* <b>Note:</b> this can mutate the storage info fields (ctime, version, etc).
* @throws IOException if no valid storage dirs are found or no valid layout version
*/
FSImageStorageInspector readAndInspectDirs(NameNodeFile nnf) throws IOException {
FSImageStorageInspector readAndInspectDirs(EnumSet<NameNodeFile> fileTypes)
throws IOException {
Integer layoutVersion = null;
boolean multipleLV = false;
StringBuilder layoutVersions = new StringBuilder();
@ -1017,7 +1021,7 @@ public class NNStorage extends Storage implements Closeable,
FSImageStorageInspector inspector;
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
inspector = new FSImageTransactionalStorageInspector(nnf);
inspector = new FSImageTransactionalStorageInspector(fileTypes);
} else {
inspector = new FSImagePreTransactionalStorageInspector();
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.TreeSet;
@ -96,7 +97,7 @@ public class NNStorageRetentionManager {
void purgeCheckpoinsAfter(NameNodeFile nnf, long fromTxId)
throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(nnf);
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);
for (FSImageFile image : inspector.getFoundImages()) {
if (image.getCheckpointTxId() > fromTxId) {
@ -107,7 +108,7 @@ public class NNStorageRetentionManager {
void purgeOldStorage(NameNodeFile nnf) throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(nnf);
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);
long minImageTxId = getImageTxIdToRetain(inspector);

View File

@ -445,8 +445,9 @@ public class SecondaryNameNode implements Runnable {
} else {
LOG.info("Image has changed. Downloading updated image from NN.");
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
nnHostPort, sig.mostRecentCheckpointTxId, dstImage.getStorage(), true);
dstImage.saveDigestAndRenameCheckpointImage(
nnHostPort, NameNodeFile.IMAGE, sig.mostRecentCheckpointTxId,
dstImage.getStorage(), true);
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
}
@ -555,7 +556,7 @@ public class SecondaryNameNode implements Runnable {
//
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
dstStorage, txid);
dstStorage, NameNodeFile.IMAGE, txid);
// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();
@ -997,7 +998,8 @@ public class SecondaryNameNode implements Runnable {
dstStorage.setStorageInfo(sig);
if (loadImage) {
File file = dstStorage.findImageFile(sig.mostRecentCheckpointTxId);
File file = dstStorage.findImageFile(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId);
if (file == null) {
throw new IOException("Couldn't find image file at txid " +
sig.mostRecentCheckpointTxId + " even though it should have " +

View File

@ -23,7 +23,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.security.DigestInputStream;
import java.security.MessageDigest;
@ -39,10 +38,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -50,11 +45,13 @@ import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@ -90,10 +87,10 @@ public class TransferFsImage {
null, false);
}
public static MD5Hash downloadImageToStorage(
URL fsName, long imageTxId, Storage dstStorage, boolean needDigest)
public static MD5Hash downloadImageToStorage(URL fsName, NameNodeFile nnf,
long imageTxId, Storage dstStorage, boolean needDigest)
throws IOException {
String fileid = GetImageServlet.getParamStringForImage(
String fileid = GetImageServlet.getParamStringForImage(nnf,
imageTxId, dstStorage);
String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
@ -166,14 +163,14 @@ public class TransferFsImage {
* @param myNNAddress the host/port where the local node is running an
* HTTPServer hosting GetImageServlet
* @param storage the storage directory to transfer the image from
* @param nnf the NameNodeFile type of the image
* @param txid the transaction ID of the image to be uploaded
*/
public static void uploadImageFromStorage(URL fsName,
URL myNNAddress,
Storage storage, long txid) throws IOException {
public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
Storage storage, NameNodeFile nnf, long txid) throws IOException {
String fileid = GetImageServlet.getParamStringToPutImage(
txid, myNNAddress, storage);
String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
myNNAddress, storage);
// this doesn't directly upload an image, but rather asks the NN
// to connect back to the 2NN to download the specified image.
try {

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
@ -207,9 +208,10 @@ public class BootstrapStandby implements Tool, Configurable {
// Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage(
otherHttpAddr, imageTxId,
otherHttpAddr, NameNodeFile.IMAGE, imageTxId,
storage, true);
image.saveDigestAndRenameCheckpointImage(imageTxId, hash);
image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
hash);
} catch (IOException ioe) {
image.close();
throw ioe;

View File

@ -142,9 +142,15 @@ public class StandbyCheckpointer {
}
}
public void triggerRollbackCheckpoint() {
thread.setNeedRollbackCheckpoint(true);
thread.interrupt();
}
private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
final long txid;
final NameNodeFile imageType;
namesystem.writeLockInterruptibly();
try {
@ -164,7 +170,15 @@ public class StandbyCheckpointer {
return;
}
img.saveNamespace(namesystem, NameNodeFile.IMAGE, canceler);
if (namesystem.isRollingUpgrade()
&& !namesystem.getFSImage().hasRollbackFSImage()) {
// if we will do rolling upgrade but have not created the rollback image
// yet, name this checkpoint as fsimage_rollback
imageType = NameNodeFile.IMAGE_ROLLBACK;
} else {
imageType = NameNodeFile.IMAGE;
}
img.saveNamespace(namesystem, imageType, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
@ -181,7 +195,7 @@ public class StandbyCheckpointer {
@Override
public Void call() throws IOException {
TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
namesystem.getFSImage().getStorage(), txid);
namesystem.getFSImage().getStorage(), imageType, txid);
return null;
}
});
@ -228,6 +242,9 @@ public class StandbyCheckpointer {
private class CheckpointerThread extends Thread {
private volatile boolean shouldRun = true;
private volatile long preventCheckpointsUntil = 0;
// Indicate that a rollback checkpoint is required immediately. It will be
// reset to false after the checkpoint is done
private volatile boolean needRollbackCheckpoint = false;
private CheckpointerThread() {
super("Standby State Checkpointer");
@ -237,6 +254,10 @@ public class StandbyCheckpointer {
this.shouldRun = shouldRun;
}
private void setNeedRollbackCheckpoint(boolean need) {
this.needRollbackCheckpoint = need;
}
@Override
public void run() {
// We have to make sure we're logged in as far as JAAS
@ -266,16 +287,19 @@ public class StandbyCheckpointer {
}
private void doWork() {
final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();
// Reset checkpoint time so that we don't always checkpoint
// on startup.
lastCheckpointTime = now();
while (shouldRun) {
try {
Thread.sleep(1000 * checkpointConf.getCheckPeriod());
} catch (InterruptedException ie) {
}
if (!shouldRun) {
break;
if (!needRollbackCheckpoint) {
try {
Thread.sleep(checkPeriod);
} catch (InterruptedException ie) {
}
if (!shouldRun) {
break;
}
}
try {
// We may have lost our ticket since last checkpoint, log in again, just in case
@ -287,8 +311,10 @@ public class StandbyCheckpointer {
long uncheckpointed = countUncheckpointedTxns();
long secsSinceLast = (now - lastCheckpointTime)/1000;
boolean needCheckpoint = false;
if (uncheckpointed >= checkpointConf.getTxnCount()) {
boolean needCheckpoint = needRollbackCheckpoint;
if (needCheckpoint) {
LOG.info("Triggering a rollback fsimage for rolling upgrade.");
} else if (uncheckpointed >= checkpointConf.getTxnCount()) {
LOG.info("Triggering checkpoint because there have been " +
uncheckpointed + " txns since the last checkpoint, which " +
"exceeds the configured threshold " +
@ -313,6 +339,13 @@ public class StandbyCheckpointer {
if (needCheckpoint) {
doCheckpoint();
// reset needRollbackCheckpoint to false only when we finish a ckpt
// for rollback image
if (needRollbackCheckpoint
&& namesystem.getFSImage().hasRollbackFSImage()) {
namesystem.setCreatedRollbackImages(true);
needRollbackCheckpoint = false;
}
lastCheckpointTime = now;
}
} catch (SaveNamespaceCancelledException ce) {

View File

@ -83,7 +83,9 @@ public class TestRollingUpgrade {
runCmd(dfsadmin, true, "-rollingUpgrade");
//start rolling upgrade
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
runCmd(dfsadmin, true, "-rollingUpgrade", "prepare");
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
//query rolling upgrade
runCmd(dfsadmin, true, "-rollingUpgrade", "query");
@ -182,7 +184,9 @@ public class TestRollingUpgrade {
dfs.mkdirs(foo);
//start rolling upgrade
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
LOG.info("START\n" + info1);
//query rolling upgrade
@ -293,7 +297,9 @@ public class TestRollingUpgrade {
final DistributedFileSystem dfs = cluster.getFileSystem();
//start rolling upgrade
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
dfs.mkdirs(bar);
@ -378,26 +384,33 @@ public class TestRollingUpgrade {
}
}
private static boolean existRollbackFsImage(NNStorage storage)
public static boolean existRollbackFsImage(NNStorage storage)
throws IOException {
final FilenameFilter filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.indexOf(NNStorage.NameNodeFile.IMAGE_ROLLBACK.getName()) != -1;
}
};
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
File dir = storage.getStorageDir(i).getCurrentDir();
int l = dir.list(filter).length;
if (l > 0) {
return true;
final int total = 10;
int retry = 0;
while (retry++ < total) {
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
File dir = storage.getStorageDir(i).getCurrentDir();
int l = dir.list(filter).length;
if (l > 0) {
return true;
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return false;
}
@Test
@Test (timeout = 300000)
public void testFinalize() throws Exception {
final Configuration conf = new HdfsConfiguration();
MiniQJMHACluster cluster = null;
@ -409,6 +422,11 @@ public class TestRollingUpgrade {
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
dfsCluster.waitActive();
// let NN1 tail editlog every 1s
dfsCluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
dfsCluster.restartNameNode(1);
dfsCluster.transitionToActive(0);
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
dfs.mkdirs(foo);

View File

@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
@ -101,8 +102,10 @@ public class TestRollingUpgradeRollback {
dfs.mkdirs(foo);
// start rolling upgrade
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
Assert.assertEquals(0,
dfsadmin.run(new String[] { "-rollingUpgrade", "prepare" }));
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
// create new directory
dfs.mkdirs(bar);
@ -160,8 +163,10 @@ public class TestRollingUpgradeRollback {
dfs.mkdirs(foo);
// start rolling upgrade
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
Assert.assertEquals(0,
dfsadmin.run(new String[] { "-rollingUpgrade", "prepare" }));
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
// create new directory
dfs.mkdirs(bar);
dfs.close();
@ -206,9 +211,9 @@ public class TestRollingUpgradeRollback {
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
dfsCluster.waitActive();
// let NN1 do checkpoints as fast as possible
// let NN1 tail editlog every 1s
dfsCluster.getConfiguration(1).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
dfsCluster.restartNameNode(1);
dfsCluster.transitionToActive(0);
@ -223,6 +228,11 @@ public class TestRollingUpgradeRollback {
dfs.mkdirs(bar);
dfs.close();
NNStorage storage0 = dfsCluster.getNameNode(0).getFSImage().getStorage();
NNStorage storage1 = dfsCluster.getNameNode(1).getFSImage().getStorage();
Assert.assertTrue(TestRollingUpgrade.existRollbackFsImage(storage0));
Assert.assertTrue(TestRollingUpgrade.existRollbackFsImage(storage1));
// rollback NN0
dfsCluster.restartNameNode(0, true, "-rollingUpgrade",
"rollback");
@ -238,21 +248,24 @@ public class TestRollingUpgradeRollback {
// check the details of NNStorage
NNStorage storage = dfsCluster.getNamesystem(0).getFSImage()
.getStorage();
// (startSegment, upgrade marker, mkdir, endSegment)
checkNNStorage(storage, 3, 7);
// segments:(startSegment, mkdir, start upgrade endSegment),
// (startSegment, mkdir, endSegment)
checkNNStorage(storage, 4, 7);
// check storage in JNs
for (int i = 0; i < NUM_JOURNAL_NODES; i++) {
File dir = cluster.getJournalCluster().getCurrentDir(0,
MiniQJMHACluster.NAMESERVICE);
// segments:(startSegment, mkdir, endSegment), (startSegment, upgrade
// marker, mkdir, endSegment)
checkJNStorage(dir, 4, 7);
checkJNStorage(dir, 5, 7);
}
// restart NN0 again to make sure we can start using the new fsimage and
// the corresponding md5 checksum
dfsCluster.restartNameNode(0);
// start the rolling upgrade again to make sure we do not load upgrade
// status after the rollback
dfsCluster.transitionToActive(0);
dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
} finally {
if (cluster != null) {
cluster.shutdown();

View File

@ -1454,7 +1454,7 @@ public class TestCheckpoint {
for (StorageDirectory sd :
image.getStorage().dirIterable(NameNodeDirType.IMAGE)) {
File imageFile = NNStorage.getImageFile(sd,
File imageFile = NNStorage.getImageFile(sd, NameNodeFile.IMAGE,
expectedTxIdToDownload + 5);
assertTrue("Image size increased",
imageFile.length() > fsimageLength);
@ -1980,7 +1980,8 @@ public class TestCheckpoint {
.when(dstImage).toColonSeparatedString();
try {
TransferFsImage.downloadImageToStorage(fsName, 0, dstImage, false);
TransferFsImage.downloadImageToStorage(fsName, NameNodeFile.IMAGE, 0,
dstImage, false);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);
@ -1996,7 +1997,8 @@ public class TestCheckpoint {
}
try {
TransferFsImage.uploadImageFromStorage(fsName, new URL("http://localhost:1234"), dstImage, 0);
TransferFsImage.uploadImageFromStorage(fsName, new URL(
"http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
fail("Storage info was not verified");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);