HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
arp 2014-09-08 21:20:55 -07:00
parent 3259e8954f
commit e736018601
10 changed files with 386 additions and 50 deletions

View File

@ -374,6 +374,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6800. Support Datanode layout changes with rolling upgrade. HDFS-6800. Support Datanode layout changes with rolling upgrade.
(James Thomas via Arpit Agarwal) (James Thomas via Arpit Agarwal)
HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -475,11 +475,14 @@ class BPOfferService {
* Signal the current rolling upgrade status as indicated by the NN. * Signal the current rolling upgrade status as indicated by the NN.
* @param inProgress true if a rolling upgrade is in progress * @param inProgress true if a rolling upgrade is in progress
*/ */
void signalRollingUpgrade(boolean inProgress) { void signalRollingUpgrade(boolean inProgress) throws IOException {
String bpid = getBlockPoolId();
if (inProgress) { if (inProgress) {
dn.getFSDataset().enableTrash(getBlockPoolId()); dn.getFSDataset().enableTrash(bpid);
dn.getFSDataset().setRollingUpgradeMarker(bpid);
} else { } else {
dn.getFSDataset().restoreTrash(getBlockPoolId()); dn.getFSDataset().restoreTrash(bpid);
dn.getFSDataset().clearRollingUpgradeMarker(bpid);
} }
} }

View File

@ -627,7 +627,7 @@ class BPServiceActor implements Runnable {
bpos.shutdownActor(this); bpos.shutdownActor(this);
} }
private void handleRollingUpgradeStatus(HeartbeatResponse resp) { private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus(); RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus();
if (rollingUpgradeStatus != null && if (rollingUpgradeStatus != null &&
rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) { rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.HardLink;
@ -38,8 +39,11 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -62,6 +66,18 @@ import java.util.regex.Pattern;
public class BlockPoolSliceStorage extends Storage { public class BlockPoolSliceStorage extends Storage {
static final String TRASH_ROOT_DIR = "trash"; static final String TRASH_ROOT_DIR = "trash";
/**
* A marker file that is created on each root directory if a rolling upgrade
* is in progress. The NN does not inform the DN when a rolling upgrade is
* finalized. All the DN can infer is whether or not a rolling upgrade is
* currently in progress. When the rolling upgrade is not in progress:
* 1. If the marker file is present, then a rolling upgrade just completed.
* If a 'previous' directory exists, it can be deleted now.
* 2. If the marker file is absent, then a regular upgrade may be in
* progress. Do not delete the 'previous' directory.
*/
static final String ROLLING_UPGRADE_MARKER_FILE = "RollingUpgradeInProgress";
private static final String BLOCK_POOL_ID_PATTERN_BASE = private static final String BLOCK_POOL_ID_PATTERN_BASE =
Pattern.quote(File.separator) + Pattern.quote(File.separator) +
"BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" + "BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" +
@ -83,6 +99,13 @@ public class BlockPoolSliceStorage extends Storage {
blockpoolID = bpid; blockpoolID = bpid;
} }
/**
* These maps are used as an optimization to avoid one filesystem operation
* per storage on each heartbeat response.
*/
private static Set<String> storagesWithRollingUpgradeMarker;
private static Set<String> storagesWithoutRollingUpgradeMarker;
BlockPoolSliceStorage(int namespaceID, String bpID, long cTime, BlockPoolSliceStorage(int namespaceID, String bpID, long cTime,
String clusterId) { String clusterId) {
super(NodeType.DATA_NODE); super(NodeType.DATA_NODE);
@ -90,10 +113,18 @@ public class BlockPoolSliceStorage extends Storage {
this.blockpoolID = bpID; this.blockpoolID = bpID;
this.cTime = cTime; this.cTime = cTime;
this.clusterID = clusterId; this.clusterID = clusterId;
storagesWithRollingUpgradeMarker = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
} }
private BlockPoolSliceStorage() { private BlockPoolSliceStorage() {
super(NodeType.DATA_NODE); super(NodeType.DATA_NODE);
storagesWithRollingUpgradeMarker = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap(
new ConcurrentHashMap<String, Boolean>());
} }
/** /**
@ -270,13 +301,9 @@ public class BlockPoolSliceStorage extends Storage {
private void doTransition(DataNode datanode, StorageDirectory sd, private void doTransition(DataNode datanode, StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
// we will already restore everything in the trash by rolling back to Preconditions.checkState(!getTrashRootDir(sd).exists(),
// the previous directory, so we must delete the trash to ensure sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
// that it's not restored by BPOfferService.signalRollingUpgrade() " both be present.");
if (!FileUtil.fullyDelete(getTrashRootDir(sd))) {
throw new IOException("Unable to delete trash directory prior to " +
"restoration of previous directory: " + getTrashRootDir(sd));
}
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
} else { } else {
// Restore all the files in the trash. The restored files are retained // Restore all the files in the trash. The restored files are retained
@ -440,10 +467,18 @@ public class BlockPoolSliceStorage extends Storage {
} }
final File newChild = new File(restoreDirectory, child.getName()); final File newChild = new File(restoreDirectory, child.getName());
if (!child.renameTo(newChild)) {
if (newChild.exists() && newChild.length() >= child.length()) {
// Failsafe - we should not hit this case but let's make sure
// we never overwrite a newer version of a block file with an
// older version.
LOG.info("Not overwriting " + newChild + " with smaller file from " +
"trash directory. This message can be safely ignored.");
} else if (!child.renameTo(newChild)) {
throw new IOException("Failed to rename " + child + " to " + newChild); throw new IOException("Failed to rename " + child + " to " + newChild);
} else {
++filesRestored;
} }
++filesRestored;
} }
FileUtil.fullyDelete(trashRoot); FileUtil.fullyDelete(trashRoot);
return filesRestored; return filesRestored;
@ -599,6 +634,18 @@ public class BlockPoolSliceStorage extends Storage {
return new File(sd.getRoot(), TRASH_ROOT_DIR); return new File(sd.getRoot(), TRASH_ROOT_DIR);
} }
/**
* Determine whether we can use trash for the given blockFile. Trash
* is disallowed if a 'previous' directory exists for the
* storage directory containing the block.
*/
@VisibleForTesting
public boolean isTrashAllowed(File blockFile) {
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
String previousDir = matcher.replaceFirst("$1$2" + STORAGE_DIR_PREVIOUS);
return !(new File(previousDir)).exists();
}
/** /**
* Get a target subdirectory under trash/ for a given block file that is being * Get a target subdirectory under trash/ for a given block file that is being
* deleted. * deleted.
@ -609,9 +656,12 @@ public class BlockPoolSliceStorage extends Storage {
* @return the trash directory for a given block file that is being deleted. * @return the trash directory for a given block file that is being deleted.
*/ */
public String getTrashDirectory(File blockFile) { public String getTrashDirectory(File blockFile) {
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); if (isTrashAllowed(blockFile)) {
String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4"); Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
return trashDirectory; String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
return trashDirectory;
}
return null;
} }
/** /**
@ -638,6 +688,7 @@ public class BlockPoolSliceStorage extends Storage {
for (StorageDirectory sd : storageDirs) { for (StorageDirectory sd : storageDirs) {
File trashRoot = getTrashRootDir(sd); File trashRoot = getTrashRootDir(sd);
try { try {
Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists()));
restoreBlockFilesFromTrash(trashRoot); restoreBlockFilesFromTrash(trashRoot);
FileUtil.fullyDelete(getTrashRootDir(sd)); FileUtil.fullyDelete(getTrashRootDir(sd));
} catch (IOException ioe) { } catch (IOException ioe) {
@ -656,4 +707,49 @@ public class BlockPoolSliceStorage extends Storage {
} }
return false; return false;
} }
/**
* Create a rolling upgrade marker file for each BP storage root, if it
* does not exist already.
*/
public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
throws IOException {
for (StorageDirectory sd : dnStorageDirs) {
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
if (!markerFile.exists() && markerFile.createNewFile()) {
LOG.info("Created " + markerFile);
} else {
LOG.info(markerFile + " already exists.");
}
storagesWithRollingUpgradeMarker.add(bpRoot.toString());
storagesWithoutRollingUpgradeMarker.remove(bpRoot.toString());
}
}
}
/**
* Check whether the rolling upgrade marker file exists for each BP storage
* root. If it does exist, then the marker file is cleared and more
* importantly the layout upgrade is finalized.
*/
public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
throws IOException {
for (StorageDirectory sd : dnStorageDirs) {
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {
if (markerFile.exists()) {
LOG.info("Deleting " + markerFile);
doFinalize(sd.getCurrentDir());
if (!markerFile.delete()) {
LOG.warn("Failed to delete " + markerFile);
}
}
storagesWithoutRollingUpgradeMarker.add(bpRoot.toString());
storagesWithRollingUpgradeMarker.remove(bpRoot.toString());
}
}
}
} }

View File

@ -70,11 +70,17 @@ public class DataStorage extends Storage {
public final static String STORAGE_DIR_FINALIZED = "finalized"; public final static String STORAGE_DIR_FINALIZED = "finalized";
public final static String STORAGE_DIR_TMP = "tmp"; public final static String STORAGE_DIR_TMP = "tmp";
// Set of bpids for which 'trash' is currently enabled. /**
// When trash is enabled block files are moved under a separate * Set of bpids for which 'trash' is currently enabled.
// 'trash' folder instead of being deleted right away. This can * When trash is enabled block files are moved under a separate
// be useful during rolling upgrades, for example. * 'trash' folder instead of being deleted right away. This can
// The set is backed by a concurrent HashMap. * be useful during rolling upgrades, for example.
* The set is backed by a concurrent HashMap.
*
* Even if trash is enabled, it is not used if a layout upgrade
* is in progress for a storage directory i.e. if the previous
* directory exists.
*/
private Set<String> trashEnabledBpids; private Set<String> trashEnabledBpids;
/** /**
@ -123,7 +129,9 @@ public class DataStorage extends Storage {
} }
/** /**
* Enable trash for the specified block pool storage. * Enable trash for the specified block pool storage. Even if trash is
* enabled by the caller, it is superseded by the 'previous' directory
* if a layout upgrade is in progress.
*/ */
public void enableTrash(String bpid) { public void enableTrash(String bpid) {
if (trashEnabledBpids.add(bpid)) { if (trashEnabledBpids.add(bpid)) {
@ -143,6 +151,14 @@ public class DataStorage extends Storage {
return trashEnabledBpids.contains(bpid); return trashEnabledBpids.contains(bpid);
} }
public void setRollingUpgradeMarker(String bpid) throws IOException {
getBPStorage(bpid).setRollingUpgradeMarkers(storageDirs);
}
public void clearRollingUpgradeMarker(String bpid) throws IOException {
getBPStorage(bpid).clearRollingUpgradeMarkers(storageDirs);
}
/** /**
* If rolling upgrades are in progress then do not delete block files * If rolling upgrades are in progress then do not delete block files
* immediately. Instead we move the block files to an intermediate * immediately. Instead we move the block files to an intermediate
@ -675,7 +691,8 @@ public class DataStorage extends Storage {
if (DataNodeLayoutVersion.supports( if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) { LayoutVersion.Feature.FEDERATION, layoutVersion)) {
// The VERSION file is already read in. Override the layoutVersion // The VERSION file is already read in. Override the layoutVersion
// field and overwrite the file. // field and overwrite the file. The upgrade work is handled by
// {@link BlockPoolSliceStorage#doUpgrade}
LOG.info("Updating layout version from " + layoutVersion + " to " LOG.info("Updating layout version from " + layoutVersion + " to "
+ HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage " + HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage "
+ sd.getRoot()); + sd.getRoot());

View File

@ -443,6 +443,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/ */
public boolean trashEnabled(String bpid); public boolean trashEnabled(String bpid);
/**
* Create a marker file indicating that a rolling upgrade is in progress.
*/
public void setRollingUpgradeMarker(String bpid) throws IOException;
/**
* Delete the rolling upgrade marker file if it exists.
* @param bpid
*/
public void clearRollingUpgradeMarker(String bpid) throws IOException;
/** /**
* submit a sync_file_range request to AsyncDiskService * submit a sync_file_range request to AsyncDiskService
*/ */

View File

@ -2039,6 +2039,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return dataStorage.trashEnabled(bpid); return dataStorage.trashEnabled(bpid);
} }
@Override
public void setRollingUpgradeMarker(String bpid) throws IOException {
dataStorage.setRollingUpgradeMarker(bpid);
}
@Override
public void clearRollingUpgradeMarker(String bpid) throws IOException {
dataStorage.clearRollingUpgradeMarker(bpid);
}
@Override @Override
public RollingLogs createRollingLogs(String bpid, String prefix public RollingLogs createRollingLogs(String bpid, String prefix
) throws IOException { ) throws IOException {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -77,6 +79,8 @@ import org.apache.hadoop.util.VersionInfo;
import org.junit.Assume; import org.junit.Assume;
import java.io.*; import java.io.*;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
@ -1420,4 +1424,38 @@ public class DFSTestUtil {
} }
return expectedPrimary.getDatanodeDescriptor(); return expectedPrimary.getDatanodeDescriptor();
} }
public static void addDataNodeLayoutVersion(final int lv, final String description)
throws NoSuchFieldException, IllegalAccessException {
Preconditions.checkState(lv < DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Override {@link DataNodeLayoutVersion#CURRENT_LAYOUT_VERSION} via reflection.
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
Field field = DataNodeLayoutVersion.class.getField("CURRENT_LAYOUT_VERSION");
field.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.setInt(null, lv);
// Override {@link HdfsConstants#DATANODE_LAYOUT_VERSION}
field = HdfsConstants.class.getField("DATANODE_LAYOUT_VERSION");
field.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.setInt(null, lv);
// Inject the feature into the FEATURES map.
final LayoutVersion.FeatureInfo featureInfo =
new LayoutVersion.FeatureInfo(lv, lv + 1, description, false);
final LayoutVersion.LayoutFeature feature =
new LayoutVersion.LayoutFeature() {
@Override
public LayoutVersion.FeatureInfo getInfo() {
return featureInfo;
}
};
// Update the FEATURES map with the new layout version.
LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
new LayoutVersion.LayoutFeature[] { feature });
}
} }

View File

@ -1072,6 +1072,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return false; return false;
} }
@Override
public void setRollingUpgradeMarker(String bpid) {
}
@Override
public void clearRollingUpgradeMarker(String bpid) {
}
@Override @Override
public void checkAndUpdate(String bpid, long blockId, File diskFile, public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) { File diskMetaFile, FsVolumeSpi vol) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -28,6 +29,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test; import org.junit.Test;
/** /**
@ -64,7 +68,7 @@ public class TestDataNodeRollingUpgrade {
Configuration conf; Configuration conf;
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
DistributedFileSystem fs = null; DistributedFileSystem fs = null;
DataNode dn = null; DataNode dn0 = null;
NameNode nn = null; NameNode nn = null;
String blockPoolId = null; String blockPoolId = null;
@ -76,8 +80,8 @@ public class TestDataNodeRollingUpgrade {
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
nn = cluster.getNameNode(0); nn = cluster.getNameNode(0);
assertNotNull(nn); assertNotNull(nn);
dn = cluster.getDataNodes().get(0); dn0 = cluster.getDataNodes().get(0);
assertNotNull(dn); assertNotNull(dn0);
blockPoolId = cluster.getNameNode(0).getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNameNode(0).getNamesystem().getBlockPoolId();
} }
@ -88,7 +92,7 @@ public class TestDataNodeRollingUpgrade {
} }
fs = null; fs = null;
nn = null; nn = null;
dn = null; dn0 = null;
blockPoolId = null; blockPoolId = null;
} }
@ -103,9 +107,10 @@ public class TestDataNodeRollingUpgrade {
private File getBlockForFile(Path path, boolean exists) throws IOException { private File getBlockForFile(Path path, boolean exists) throws IOException {
LocatedBlocks blocks = nn.getRpcServer().getBlockLocations(path.toString(), LocatedBlocks blocks = nn.getRpcServer().getBlockLocations(path.toString(),
0, Long.MAX_VALUE); 0, Long.MAX_VALUE);
assertEquals(1, blocks.getLocatedBlocks().size()); assertEquals("The test helper functions assume that each file has a single block",
1, blocks.getLocatedBlocks().size());
ExtendedBlock block = blocks.getLocatedBlocks().get(0).getBlock(); ExtendedBlock block = blocks.getLocatedBlocks().get(0).getBlock();
BlockLocalPathInfo bInfo = dn.getFSDataset().getBlockLocalPathInfo(block); BlockLocalPathInfo bInfo = dn0.getFSDataset().getBlockLocalPathInfo(block);
File blockFile = new File(bInfo.getBlockPath()); File blockFile = new File(bInfo.getBlockPath());
assertEquals(exists, blockFile.exists()); assertEquals(exists, blockFile.exists());
return blockFile; return blockFile;
@ -113,7 +118,7 @@ public class TestDataNodeRollingUpgrade {
private File getTrashFileForBlock(File blockFile, boolean exists) { private File getTrashFileForBlock(File blockFile, boolean exists) {
File trashFile = new File( File trashFile = new File(
dn.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile)); dn0.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile));
assertEquals(exists, trashFile.exists()); assertEquals(exists, trashFile.exists());
return trashFile; return trashFile;
} }
@ -135,11 +140,10 @@ public class TestDataNodeRollingUpgrade {
assertFalse(blockFile.exists()); assertFalse(blockFile.exists());
} }
private void ensureTrashDisabled() { private boolean isTrashRootPresent() {
// Trash is disabled; trash root does not exist // Trash is disabled; trash root does not exist
assertFalse(dn.getFSDataset().trashEnabled(blockPoolId)); BlockPoolSliceStorage bps = dn0.getStorage().getBPStorage(blockPoolId);
BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId); return bps.trashEnabled();
assertFalse(bps.trashEnabled());
} }
/** /**
@ -149,17 +153,25 @@ public class TestDataNodeRollingUpgrade {
throws Exception { throws Exception {
assertTrue(blockFile.exists()); assertTrue(blockFile.exists());
assertFalse(trashFile.exists()); assertFalse(trashFile.exists());
ensureTrashDisabled(); assertFalse(isTrashRootPresent());
}
private boolean isBlockFileInPrevious(File blockFile) {
Pattern blockFilePattern = Pattern.compile("^(.*/current/.*/)(current)(/.*)$");
Matcher matcher = blockFilePattern.matcher(blockFile.toString());
String previousFileName = matcher.replaceFirst("$1" + "previous" + "$3");
return ((new File(previousFileName)).exists());
} }
private void startRollingUpgrade() throws Exception { private void startRollingUpgrade() throws Exception {
LOG.info("Starting rolling upgrade"); LOG.info("Starting rolling upgrade");
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
final DFSAdmin dfsadmin = new DFSAdmin(conf); final DFSAdmin dfsadmin = new DFSAdmin(conf);
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare"); TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare");
triggerHeartBeats(); triggerHeartBeats();
// Ensure datanode rolling upgrade is started // Ensure datanode rolling upgrade is started
assertTrue(dn.getFSDataset().trashEnabled(blockPoolId)); assertTrue(dn0.getFSDataset().trashEnabled(blockPoolId));
} }
private void finalizeRollingUpgrade() throws Exception { private void finalizeRollingUpgrade() throws Exception {
@ -169,8 +181,8 @@ public class TestDataNodeRollingUpgrade {
triggerHeartBeats(); triggerHeartBeats();
// Ensure datanode rolling upgrade is started // Ensure datanode rolling upgrade is started
assertFalse(dn.getFSDataset().trashEnabled(blockPoolId)); assertFalse(dn0.getFSDataset().trashEnabled(blockPoolId));
BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId); BlockPoolSliceStorage bps = dn0.getStorage().getBPStorage(blockPoolId);
assertFalse(bps.trashEnabled()); assertFalse(bps.trashEnabled());
} }
@ -179,13 +191,15 @@ public class TestDataNodeRollingUpgrade {
// Restart the namenode with rolling upgrade rollback // Restart the namenode with rolling upgrade rollback
LOG.info("Starting rollback of the rolling upgrade"); LOG.info("Starting rollback of the rolling upgrade");
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0); MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
dnprop.setDnArgs("-rollback");
cluster.shutdownNameNodes(); cluster.shutdownNameNodes();
cluster.restartNameNode("-rollingupgrade", "rollback"); cluster.restartNameNode("-rollingupgrade", "rollback");
cluster.restartDataNode(dnprop); cluster.restartDataNode(dnprop);
cluster.waitActive(); cluster.waitActive();
nn = cluster.getNameNode(0); nn = cluster.getNameNode(0);
dn = cluster.getDataNodes().get(0); dn0 = cluster.getDataNodes().get(0);
triggerHeartBeats(); triggerHeartBeats();
LOG.info("The cluster is active after rollback");
} }
@Test (timeout=600000) @Test (timeout=600000)
@ -194,12 +208,11 @@ public class TestDataNodeRollingUpgrade {
startCluster(); startCluster();
// Create files in DFS. // Create files in DFS.
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat"); Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
Path testFile2 = new Path("/TestDataNodeRollingUpgrade2.dat"); Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat");
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED); DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED); DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED);
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
startRollingUpgrade(); startRollingUpgrade();
File blockFile = getBlockForFile(testFile2, true); File blockFile = getBlockForFile(testFile2, true);
File trashFile = getTrashFileForBlock(blockFile, false); File trashFile = getTrashFileForBlock(blockFile, false);
@ -207,7 +220,7 @@ public class TestDataNodeRollingUpgrade {
finalizeRollingUpgrade(); finalizeRollingUpgrade();
// Ensure that delete file testFile2 stays deleted after finalize // Ensure that delete file testFile2 stays deleted after finalize
ensureTrashDisabled(); assertFalse(isTrashRootPresent());
assert(!fs.exists(testFile2)); assert(!fs.exists(testFile2));
assert(fs.exists(testFile1)); assert(fs.exists(testFile1));
@ -222,11 +235,10 @@ public class TestDataNodeRollingUpgrade {
startCluster(); startCluster();
// Create files in DFS. // Create files in DFS.
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat"); Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED); DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
String fileContents1 = DFSTestUtil.readFile(fs, testFile1); String fileContents1 = DFSTestUtil.readFile(fs, testFile1);
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
startRollingUpgrade(); startRollingUpgrade();
File blockFile = getBlockForFile(testFile1, true); File blockFile = getBlockForFile(testFile1, true);
@ -255,9 +267,9 @@ public class TestDataNodeRollingUpgrade {
startCluster(); startCluster();
// Create files in DFS. // Create files in DFS.
String testFile1 = "/TestDataNodeXceiver1.dat"; String testFile1 = "/" + GenericTestUtils.getMethodName() + ".01.dat";
String testFile2 = "/TestDataNodeXceiver2.dat"; String testFile2 = "/" + GenericTestUtils.getMethodName() + ".02.dat";
String testFile3 = "/TestDataNodeXceiver3.dat"; String testFile3 = "/" + GenericTestUtils.getMethodName() + ".03.dat";
DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf); DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf); DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
@ -277,12 +289,12 @@ public class TestDataNodeRollingUpgrade {
s3.write(toWrite, 0, 1024*1024*8); s3.write(toWrite, 0, 1024*1024*8);
s3.flush(); s3.flush();
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer() assertTrue(dn0.getXferServer().getNumPeersXceiver() == dn0.getXferServer()
.getNumPeersXceiver()); .getNumPeersXceiver());
s1.close(); s1.close();
s2.close(); s2.close();
s3.close(); s3.close();
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer() assertTrue(dn0.getXferServer().getNumPeersXceiver() == dn0.getXferServer()
.getNumPeersXceiver()); .getNumPeersXceiver());
client1.close(); client1.close();
client2.close(); client2.close();
@ -291,4 +303,143 @@ public class TestDataNodeRollingUpgrade {
shutdownCluster(); shutdownCluster();
} }
} }
/**
* Support for layout version change with rolling upgrade was
* added by HDFS-6800 and HDFS-6981.
*/
@Test(timeout=300000)
public void testWithLayoutChangeAndFinalize() throws Exception {
final long seed = 0x600DF00D;
try {
startCluster();
Path[] paths = new Path[3];
File[] blockFiles = new File[3];
// Create two files in DFS.
for (int i = 0; i < 2; ++i) {
paths[i] = new Path("/" + GenericTestUtils.getMethodName() + "." + i + ".dat");
DFSTestUtil.createFile(fs, paths[i], BLOCK_SIZE, (short) 2, seed);
}
startRollingUpgrade();
// Delete the first file. The DN will save its block files in trash.
blockFiles[0] = getBlockForFile(paths[0], true);
File trashFile0 = getTrashFileForBlock(blockFiles[0], false);
deleteAndEnsureInTrash(paths[0], blockFiles[0], trashFile0);
// Restart the DN with a new layout version to trigger layout upgrade.
LOG.info("Shutting down the Datanode");
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
DFSTestUtil.addDataNodeLayoutVersion(
DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1,
"Test Layout for TestDataNodeRollingUpgrade");
LOG.info("Restarting the DataNode");
cluster.restartDataNode(dnprop, true);
cluster.waitActive();
dn0 = cluster.getDataNodes().get(0);
LOG.info("The DN has been restarted");
assertFalse(trashFile0.exists());
assertFalse(dn0.getStorage().getBPStorage(blockPoolId).isTrashAllowed(blockFiles[0]));
// Ensure that the block file for the first file was moved from 'trash' to 'previous'.
assertTrue(isBlockFileInPrevious(blockFiles[0]));
assertFalse(isTrashRootPresent());
// Delete the second file. Ensure that its block file is in previous.
blockFiles[1] = getBlockForFile(paths[1], true);
fs.delete(paths[1], false);
assertTrue(isBlockFileInPrevious(blockFiles[1]));
assertFalse(isTrashRootPresent());
// Rollback and ensure that neither block file exists in trash or previous.
finalizeRollingUpgrade();
assertFalse(isTrashRootPresent());
assertFalse(isBlockFileInPrevious(blockFiles[0]));
assertFalse(isBlockFileInPrevious(blockFiles[1]));
} finally {
shutdownCluster();
}
}
/**
* Support for layout version change with rolling upgrade was
* added by HDFS-6800 and HDFS-6981.
*/
@Test(timeout=300000)
public void testWithLayoutChangeAndRollback() throws Exception {
final long seed = 0x600DF00D;
try {
startCluster();
Path[] paths = new Path[3];
File[] blockFiles = new File[3];
// Create two files in DFS.
for (int i = 0; i < 2; ++i) {
paths[i] = new Path("/" + GenericTestUtils.getMethodName() + "." + i + ".dat");
DFSTestUtil.createFile(fs, paths[i], BLOCK_SIZE, (short) 1, seed);
}
startRollingUpgrade();
// Delete the first file. The DN will save its block files in trash.
blockFiles[0] = getBlockForFile(paths[0], true);
File trashFile0 = getTrashFileForBlock(blockFiles[0], false);
deleteAndEnsureInTrash(paths[0], blockFiles[0], trashFile0);
// Restart the DN with a new layout version to trigger layout upgrade.
LOG.info("Shutting down the Datanode");
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
DFSTestUtil.addDataNodeLayoutVersion(
DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1,
"Test Layout for TestDataNodeRollingUpgrade");
LOG.info("Restarting the DataNode");
cluster.restartDataNode(dnprop, true);
cluster.waitActive();
dn0 = cluster.getDataNodes().get(0);
LOG.info("The DN has been restarted");
assertFalse(trashFile0.exists());
assertFalse(dn0.getStorage().getBPStorage(blockPoolId).isTrashAllowed(blockFiles[0]));
// Ensure that the block file for the first file was moved from 'trash' to 'previous'.
assertTrue(isBlockFileInPrevious(blockFiles[0]));
assertFalse(isTrashRootPresent());
// Delete the second file. Ensure that its block file is in previous.
blockFiles[1] = getBlockForFile(paths[1], true);
fs.delete(paths[1], false);
assertTrue(isBlockFileInPrevious(blockFiles[1]));
assertFalse(isTrashRootPresent());
// Create and delete a third file. Its block file should not be
// in either trash or previous after deletion.
paths[2] = new Path("/" + GenericTestUtils.getMethodName() + ".2.dat");
DFSTestUtil.createFile(fs, paths[2], BLOCK_SIZE, (short) 1, seed);
blockFiles[2] = getBlockForFile(paths[2], true);
fs.delete(paths[2], false);
assertFalse(isBlockFileInPrevious(blockFiles[2]));
assertFalse(isTrashRootPresent());
// Rollback and ensure that the first two file contents were restored.
rollbackRollingUpgrade();
for (int i = 0; i < 2; ++i) {
byte[] actual = DFSTestUtil.readFileBuffer(fs, paths[i]);
byte[] calculated = DFSTestUtil.calculateFileContentsFromSeed(seed, BLOCK_SIZE);
assertArrayEquals(actual, calculated);
}
// And none of the block files must be in previous or trash.
assertFalse(isTrashRootPresent());
for (int i = 0; i < 3; ++i) {
assertFalse(isBlockFileInPrevious(blockFiles[i]));
}
} finally {
shutdownCluster();
}
}
} }