From f949f6b54825dac61511a5761837e2fd14437239 Mon Sep 17 00:00:00 2001 From: arp Date: Mon, 8 Sep 2014 21:20:55 -0700 Subject: [PATCH] HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/datanode/BPOfferService.java | 9 +- .../hdfs/server/datanode/BPServiceActor.java | 2 +- .../datanode/BlockPoolSliceStorage.java | 120 +++++++++- .../hdfs/server/datanode/DataStorage.java | 31 ++- .../datanode/fsdataset/FsDatasetSpi.java | 11 + .../fsdataset/impl/FsDatasetImpl.java | 10 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 38 ++++ .../server/datanode/SimulatedFSDataset.java | 8 + .../datanode/TestDataNodeRollingUpgrade.java | 205 +++++++++++++++--- 10 files changed, 386 insertions(+), 50 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 72c0a8fb731..549f6e17131 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -632,6 +632,8 @@ Release 2.6.0 - UNRELEASED HDFS-6951. Correctly persist raw namespace xattrs to edit log and fsimage. (clamb via wang) + HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 822c03d8c3f..3121496f5fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -474,11 +474,14 @@ class BPOfferService { * Signal the current rolling upgrade status as indicated by the NN. * @param inProgress true if a rolling upgrade is in progress */ - void signalRollingUpgrade(boolean inProgress) { + void signalRollingUpgrade(boolean inProgress) throws IOException { + String bpid = getBlockPoolId(); if (inProgress) { - dn.getFSDataset().enableTrash(getBlockPoolId()); + dn.getFSDataset().enableTrash(bpid); + dn.getFSDataset().setRollingUpgradeMarker(bpid); } else { - dn.getFSDataset().restoreTrash(getBlockPoolId()); + dn.getFSDataset().restoreTrash(bpid); + dn.getFSDataset().clearRollingUpgradeMarker(bpid); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 59ca11a5404..7d3068879b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -627,7 +627,7 @@ class BPServiceActor implements Runnable { bpos.shutdownActor(this); } - private void handleRollingUpgradeStatus(HeartbeatResponse resp) { + private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException { RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus(); if (rollingUpgradeStatus != null && rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index b7f688dca4d..8333bb4af53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; @@ -38,8 +39,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -62,6 +66,18 @@ import java.util.regex.Pattern; public class BlockPoolSliceStorage extends Storage { static final String TRASH_ROOT_DIR = "trash"; + /** + * A marker file that is created on each root directory if a rolling upgrade + * is in progress. The NN does not inform the DN when a rolling upgrade is + * finalized. All the DN can infer is whether or not a rolling upgrade is + * currently in progress. When the rolling upgrade is not in progress: + * 1. If the marker file is present, then a rolling upgrade just completed. + * If a 'previous' directory exists, it can be deleted now. + * 2. If the marker file is absent, then a regular upgrade may be in + * progress. Do not delete the 'previous' directory. + */ + static final String ROLLING_UPGRADE_MARKER_FILE = "RollingUpgradeInProgress"; + private static final String BLOCK_POOL_ID_PATTERN_BASE = Pattern.quote(File.separator) + "BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" + @@ -83,6 +99,13 @@ public class BlockPoolSliceStorage extends Storage { blockpoolID = bpid; } + /** + * These maps are used as an optimization to avoid one filesystem operation + * per storage on each heartbeat response. + */ + private static Set storagesWithRollingUpgradeMarker; + private static Set storagesWithoutRollingUpgradeMarker; + BlockPoolSliceStorage(int namespaceID, String bpID, long cTime, String clusterId) { super(NodeType.DATA_NODE); @@ -90,10 +113,18 @@ public class BlockPoolSliceStorage extends Storage { this.blockpoolID = bpID; this.cTime = cTime; this.clusterID = clusterId; + storagesWithRollingUpgradeMarker = Collections.newSetFromMap( + new ConcurrentHashMap()); + storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap( + new ConcurrentHashMap()); } private BlockPoolSliceStorage() { super(NodeType.DATA_NODE); + storagesWithRollingUpgradeMarker = Collections.newSetFromMap( + new ConcurrentHashMap()); + storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap( + new ConcurrentHashMap()); } /** @@ -270,13 +301,9 @@ public class BlockPoolSliceStorage extends Storage { private void doTransition(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { - // we will already restore everything in the trash by rolling back to - // the previous directory, so we must delete the trash to ensure - // that it's not restored by BPOfferService.signalRollingUpgrade() - if (!FileUtil.fullyDelete(getTrashRootDir(sd))) { - throw new IOException("Unable to delete trash directory prior to " + - "restoration of previous directory: " + getTrashRootDir(sd)); - } + Preconditions.checkState(!getTrashRootDir(sd).exists(), + sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + + " both be present."); doRollback(sd, nsInfo); // rollback if applicable } else { // Restore all the files in the trash. The restored files are retained @@ -440,10 +467,18 @@ public class BlockPoolSliceStorage extends Storage { } final File newChild = new File(restoreDirectory, child.getName()); - if (!child.renameTo(newChild)) { + + if (newChild.exists() && newChild.length() >= child.length()) { + // Failsafe - we should not hit this case but let's make sure + // we never overwrite a newer version of a block file with an + // older version. + LOG.info("Not overwriting " + newChild + " with smaller file from " + + "trash directory. This message can be safely ignored."); + } else if (!child.renameTo(newChild)) { throw new IOException("Failed to rename " + child + " to " + newChild); + } else { + ++filesRestored; } - ++filesRestored; } FileUtil.fullyDelete(trashRoot); return filesRestored; @@ -599,6 +634,18 @@ public class BlockPoolSliceStorage extends Storage { return new File(sd.getRoot(), TRASH_ROOT_DIR); } + /** + * Determine whether we can use trash for the given blockFile. Trash + * is disallowed if a 'previous' directory exists for the + * storage directory containing the block. + */ + @VisibleForTesting + public boolean isTrashAllowed(File blockFile) { + Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); + String previousDir = matcher.replaceFirst("$1$2" + STORAGE_DIR_PREVIOUS); + return !(new File(previousDir)).exists(); + } + /** * Get a target subdirectory under trash/ for a given block file that is being * deleted. @@ -609,9 +656,12 @@ public class BlockPoolSliceStorage extends Storage { * @return the trash directory for a given block file that is being deleted. */ public String getTrashDirectory(File blockFile) { - Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); - String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4"); - return trashDirectory; + if (isTrashAllowed(blockFile)) { + Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent()); + String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4"); + return trashDirectory; + } + return null; } /** @@ -638,6 +688,7 @@ public class BlockPoolSliceStorage extends Storage { for (StorageDirectory sd : storageDirs) { File trashRoot = getTrashRootDir(sd); try { + Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists())); restoreBlockFilesFromTrash(trashRoot); FileUtil.fullyDelete(getTrashRootDir(sd)); } catch (IOException ioe) { @@ -656,4 +707,49 @@ public class BlockPoolSliceStorage extends Storage { } return false; } + + /** + * Create a rolling upgrade marker file for each BP storage root, if it + * does not exist already. + */ + public void setRollingUpgradeMarkers(List dnStorageDirs) + throws IOException { + for (StorageDirectory sd : dnStorageDirs) { + File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir()); + File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE); + if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) { + if (!markerFile.exists() && markerFile.createNewFile()) { + LOG.info("Created " + markerFile); + } else { + LOG.info(markerFile + " already exists."); + } + storagesWithRollingUpgradeMarker.add(bpRoot.toString()); + storagesWithoutRollingUpgradeMarker.remove(bpRoot.toString()); + } + } + } + + /** + * Check whether the rolling upgrade marker file exists for each BP storage + * root. If it does exist, then the marker file is cleared and more + * importantly the layout upgrade is finalized. + */ + public void clearRollingUpgradeMarkers(List dnStorageDirs) + throws IOException { + for (StorageDirectory sd : dnStorageDirs) { + File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir()); + File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE); + if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) { + if (markerFile.exists()) { + LOG.info("Deleting " + markerFile); + doFinalize(sd.getCurrentDir()); + if (!markerFile.delete()) { + LOG.warn("Failed to delete " + markerFile); + } + } + storagesWithoutRollingUpgradeMarker.add(bpRoot.toString()); + storagesWithRollingUpgradeMarker.remove(bpRoot.toString()); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index f382a9e7eb8..4383e56153a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -83,11 +83,17 @@ public class DataStorage extends Storage { public final static String STORAGE_DIR_FINALIZED = "finalized"; public final static String STORAGE_DIR_TMP = "tmp"; - // Set of bpids for which 'trash' is currently enabled. - // When trash is enabled block files are moved under a separate - // 'trash' folder instead of being deleted right away. This can - // be useful during rolling upgrades, for example. - // The set is backed by a concurrent HashMap. + /** + * Set of bpids for which 'trash' is currently enabled. + * When trash is enabled block files are moved under a separate + * 'trash' folder instead of being deleted right away. This can + * be useful during rolling upgrades, for example. + * The set is backed by a concurrent HashMap. + * + * Even if trash is enabled, it is not used if a layout upgrade + * is in progress for a storage directory i.e. if the previous + * directory exists. + */ private Set trashEnabledBpids; /** @@ -136,7 +142,9 @@ public class DataStorage extends Storage { } /** - * Enable trash for the specified block pool storage. + * Enable trash for the specified block pool storage. Even if trash is + * enabled by the caller, it is superseded by the 'previous' directory + * if a layout upgrade is in progress. */ public void enableTrash(String bpid) { if (trashEnabledBpids.add(bpid)) { @@ -156,6 +164,14 @@ public class DataStorage extends Storage { return trashEnabledBpids.contains(bpid); } + public void setRollingUpgradeMarker(String bpid) throws IOException { + getBPStorage(bpid).setRollingUpgradeMarkers(storageDirs); + } + + public void clearRollingUpgradeMarker(String bpid) throws IOException { + getBPStorage(bpid).clearRollingUpgradeMarkers(storageDirs); + } + /** * If rolling upgrades are in progress then do not delete block files * immediately. Instead we move the block files to an intermediate @@ -688,7 +704,8 @@ public class DataStorage extends Storage { if (DataNodeLayoutVersion.supports( LayoutVersion.Feature.FEDERATION, layoutVersion)) { // The VERSION file is already read in. Override the layoutVersion - // field and overwrite the file. + // field and overwrite the file. The upgrade work is handled by + // {@link BlockPoolSliceStorage#doUpgrade} LOG.info("Updating layout version from " + layoutVersion + " to " + HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage " + sd.getRoot()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 0fbfe190869..553208eeafd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -443,6 +443,17 @@ public interface FsDatasetSpi extends FSDatasetMBean { */ public boolean trashEnabled(String bpid); + /** + * Create a marker file indicating that a rolling upgrade is in progress. + */ + public void setRollingUpgradeMarker(String bpid) throws IOException; + + /** + * Delete the rolling upgrade marker file if it exists. + * @param bpid + */ + public void clearRollingUpgradeMarker(String bpid) throws IOException; + /** * submit a sync_file_range request to AsyncDiskService */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 4511f21c513..05c99143c8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2039,6 +2039,16 @@ class FsDatasetImpl implements FsDatasetSpi { return dataStorage.trashEnabled(bpid); } + @Override + public void setRollingUpgradeMarker(String bpid) throws IOException { + dataStorage.setRollingUpgradeMarker(bpid); + } + + @Override + public void clearRollingUpgradeMarker(String bpid) throws IOException { + dataStorage.clearRollingUpgradeMarker(bpid); + } + @Override public RollingLogs createRollingLogs(String bpid, String prefix ) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index a57dd2f9cdc..e3a265fcab0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion; import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -77,6 +79,8 @@ import org.apache.hadoop.util.VersionInfo; import org.junit.Assume; import java.io.*; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.net.*; import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; @@ -1400,4 +1404,38 @@ public class DFSTestUtil { } return expectedPrimary.getDatanodeDescriptor(); } + + public static void addDataNodeLayoutVersion(final int lv, final String description) + throws NoSuchFieldException, IllegalAccessException { + Preconditions.checkState(lv < DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + + // Override {@link DataNodeLayoutVersion#CURRENT_LAYOUT_VERSION} via reflection. + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + Field field = DataNodeLayoutVersion.class.getField("CURRENT_LAYOUT_VERSION"); + field.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.setInt(null, lv); + + // Override {@link HdfsConstants#DATANODE_LAYOUT_VERSION} + field = HdfsConstants.class.getField("DATANODE_LAYOUT_VERSION"); + field.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.setInt(null, lv); + + // Inject the feature into the FEATURES map. + final LayoutVersion.FeatureInfo featureInfo = + new LayoutVersion.FeatureInfo(lv, lv + 1, description, false); + final LayoutVersion.LayoutFeature feature = + new LayoutVersion.LayoutFeature() { + @Override + public LayoutVersion.FeatureInfo getInfo() { + return featureInfo; + } + }; + + // Update the FEATURES map with the new layout version. + LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES, + new LayoutVersion.LayoutFeature[] { feature }); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index a51342ea751..8ad4510d0d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1072,6 +1072,14 @@ public class SimulatedFSDataset implements FsDatasetSpi { return false; } + @Override + public void setRollingUpgradeMarker(String bpid) { + } + + @Override + public void clearRollingUpgradeMarker(String bpid) { + } + @Override public void checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java index f58f4710825..befb298e3b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -28,6 +29,8 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; /** @@ -64,7 +68,7 @@ public class TestDataNodeRollingUpgrade { Configuration conf; MiniDFSCluster cluster = null; DistributedFileSystem fs = null; - DataNode dn = null; + DataNode dn0 = null; NameNode nn = null; String blockPoolId = null; @@ -76,8 +80,8 @@ public class TestDataNodeRollingUpgrade { fs = cluster.getFileSystem(); nn = cluster.getNameNode(0); assertNotNull(nn); - dn = cluster.getDataNodes().get(0); - assertNotNull(dn); + dn0 = cluster.getDataNodes().get(0); + assertNotNull(dn0); blockPoolId = cluster.getNameNode(0).getNamesystem().getBlockPoolId(); } @@ -88,7 +92,7 @@ public class TestDataNodeRollingUpgrade { } fs = null; nn = null; - dn = null; + dn0 = null; blockPoolId = null; } @@ -103,9 +107,10 @@ public class TestDataNodeRollingUpgrade { private File getBlockForFile(Path path, boolean exists) throws IOException { LocatedBlocks blocks = nn.getRpcServer().getBlockLocations(path.toString(), 0, Long.MAX_VALUE); - assertEquals(1, blocks.getLocatedBlocks().size()); + assertEquals("The test helper functions assume that each file has a single block", + 1, blocks.getLocatedBlocks().size()); ExtendedBlock block = blocks.getLocatedBlocks().get(0).getBlock(); - BlockLocalPathInfo bInfo = dn.getFSDataset().getBlockLocalPathInfo(block); + BlockLocalPathInfo bInfo = dn0.getFSDataset().getBlockLocalPathInfo(block); File blockFile = new File(bInfo.getBlockPath()); assertEquals(exists, blockFile.exists()); return blockFile; @@ -113,7 +118,7 @@ public class TestDataNodeRollingUpgrade { private File getTrashFileForBlock(File blockFile, boolean exists) { File trashFile = new File( - dn.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile)); + dn0.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile)); assertEquals(exists, trashFile.exists()); return trashFile; } @@ -135,11 +140,10 @@ public class TestDataNodeRollingUpgrade { assertFalse(blockFile.exists()); } - private void ensureTrashDisabled() { + private boolean isTrashRootPresent() { // Trash is disabled; trash root does not exist - assertFalse(dn.getFSDataset().trashEnabled(blockPoolId)); - BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId); - assertFalse(bps.trashEnabled()); + BlockPoolSliceStorage bps = dn0.getStorage().getBPStorage(blockPoolId); + return bps.trashEnabled(); } /** @@ -149,17 +153,25 @@ public class TestDataNodeRollingUpgrade { throws Exception { assertTrue(blockFile.exists()); assertFalse(trashFile.exists()); - ensureTrashDisabled(); + assertFalse(isTrashRootPresent()); + } + + private boolean isBlockFileInPrevious(File blockFile) { + Pattern blockFilePattern = Pattern.compile("^(.*/current/.*/)(current)(/.*)$"); + Matcher matcher = blockFilePattern.matcher(blockFile.toString()); + String previousFileName = matcher.replaceFirst("$1" + "previous" + "$3"); + return ((new File(previousFileName)).exists()); } private void startRollingUpgrade() throws Exception { LOG.info("Starting rolling upgrade"); + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); final DFSAdmin dfsadmin = new DFSAdmin(conf); TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare"); triggerHeartBeats(); // Ensure datanode rolling upgrade is started - assertTrue(dn.getFSDataset().trashEnabled(blockPoolId)); + assertTrue(dn0.getFSDataset().trashEnabled(blockPoolId)); } private void finalizeRollingUpgrade() throws Exception { @@ -169,8 +181,8 @@ public class TestDataNodeRollingUpgrade { triggerHeartBeats(); // Ensure datanode rolling upgrade is started - assertFalse(dn.getFSDataset().trashEnabled(blockPoolId)); - BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId); + assertFalse(dn0.getFSDataset().trashEnabled(blockPoolId)); + BlockPoolSliceStorage bps = dn0.getStorage().getBPStorage(blockPoolId); assertFalse(bps.trashEnabled()); } @@ -179,13 +191,15 @@ public class TestDataNodeRollingUpgrade { // Restart the namenode with rolling upgrade rollback LOG.info("Starting rollback of the rolling upgrade"); MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0); + dnprop.setDnArgs("-rollback"); cluster.shutdownNameNodes(); cluster.restartNameNode("-rollingupgrade", "rollback"); cluster.restartDataNode(dnprop); cluster.waitActive(); nn = cluster.getNameNode(0); - dn = cluster.getDataNodes().get(0); + dn0 = cluster.getDataNodes().get(0); triggerHeartBeats(); + LOG.info("The cluster is active after rollback"); } @Test (timeout=600000) @@ -194,12 +208,11 @@ public class TestDataNodeRollingUpgrade { startCluster(); // Create files in DFS. - Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat"); - Path testFile2 = new Path("/TestDataNodeRollingUpgrade2.dat"); + Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat"); + Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat"); DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED); DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED); - fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); startRollingUpgrade(); File blockFile = getBlockForFile(testFile2, true); File trashFile = getTrashFileForBlock(blockFile, false); @@ -207,7 +220,7 @@ public class TestDataNodeRollingUpgrade { finalizeRollingUpgrade(); // Ensure that delete file testFile2 stays deleted after finalize - ensureTrashDisabled(); + assertFalse(isTrashRootPresent()); assert(!fs.exists(testFile2)); assert(fs.exists(testFile1)); @@ -222,11 +235,10 @@ public class TestDataNodeRollingUpgrade { startCluster(); // Create files in DFS. - Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat"); + Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat"); DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED); String fileContents1 = DFSTestUtil.readFile(fs, testFile1); - fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); startRollingUpgrade(); File blockFile = getBlockForFile(testFile1, true); @@ -255,9 +267,9 @@ public class TestDataNodeRollingUpgrade { startCluster(); // Create files in DFS. - String testFile1 = "/TestDataNodeXceiver1.dat"; - String testFile2 = "/TestDataNodeXceiver2.dat"; - String testFile3 = "/TestDataNodeXceiver3.dat"; + String testFile1 = "/" + GenericTestUtils.getMethodName() + ".01.dat"; + String testFile2 = "/" + GenericTestUtils.getMethodName() + ".02.dat"; + String testFile3 = "/" + GenericTestUtils.getMethodName() + ".03.dat"; DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf); DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf); @@ -277,12 +289,12 @@ public class TestDataNodeRollingUpgrade { s3.write(toWrite, 0, 1024*1024*8); s3.flush(); - assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer() + assertTrue(dn0.getXferServer().getNumPeersXceiver() == dn0.getXferServer() .getNumPeersXceiver()); s1.close(); s2.close(); s3.close(); - assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer() + assertTrue(dn0.getXferServer().getNumPeersXceiver() == dn0.getXferServer() .getNumPeersXceiver()); client1.close(); client2.close(); @@ -291,4 +303,143 @@ public class TestDataNodeRollingUpgrade { shutdownCluster(); } } + + /** + * Support for layout version change with rolling upgrade was + * added by HDFS-6800 and HDFS-6981. + */ + @Test(timeout=300000) + public void testWithLayoutChangeAndFinalize() throws Exception { + final long seed = 0x600DF00D; + try { + startCluster(); + + Path[] paths = new Path[3]; + File[] blockFiles = new File[3]; + + // Create two files in DFS. + for (int i = 0; i < 2; ++i) { + paths[i] = new Path("/" + GenericTestUtils.getMethodName() + "." + i + ".dat"); + DFSTestUtil.createFile(fs, paths[i], BLOCK_SIZE, (short) 2, seed); + } + + startRollingUpgrade(); + + // Delete the first file. The DN will save its block files in trash. + blockFiles[0] = getBlockForFile(paths[0], true); + File trashFile0 = getTrashFileForBlock(blockFiles[0], false); + deleteAndEnsureInTrash(paths[0], blockFiles[0], trashFile0); + + // Restart the DN with a new layout version to trigger layout upgrade. + LOG.info("Shutting down the Datanode"); + MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0); + DFSTestUtil.addDataNodeLayoutVersion( + DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1, + "Test Layout for TestDataNodeRollingUpgrade"); + LOG.info("Restarting the DataNode"); + cluster.restartDataNode(dnprop, true); + cluster.waitActive(); + + dn0 = cluster.getDataNodes().get(0); + LOG.info("The DN has been restarted"); + assertFalse(trashFile0.exists()); + assertFalse(dn0.getStorage().getBPStorage(blockPoolId).isTrashAllowed(blockFiles[0])); + + // Ensure that the block file for the first file was moved from 'trash' to 'previous'. + assertTrue(isBlockFileInPrevious(blockFiles[0])); + assertFalse(isTrashRootPresent()); + + // Delete the second file. Ensure that its block file is in previous. + blockFiles[1] = getBlockForFile(paths[1], true); + fs.delete(paths[1], false); + assertTrue(isBlockFileInPrevious(blockFiles[1])); + assertFalse(isTrashRootPresent()); + + // Rollback and ensure that neither block file exists in trash or previous. + finalizeRollingUpgrade(); + assertFalse(isTrashRootPresent()); + assertFalse(isBlockFileInPrevious(blockFiles[0])); + assertFalse(isBlockFileInPrevious(blockFiles[1])); + } finally { + shutdownCluster(); + } + } + + /** + * Support for layout version change with rolling upgrade was + * added by HDFS-6800 and HDFS-6981. + */ + @Test(timeout=300000) + public void testWithLayoutChangeAndRollback() throws Exception { + final long seed = 0x600DF00D; + try { + startCluster(); + + Path[] paths = new Path[3]; + File[] blockFiles = new File[3]; + + // Create two files in DFS. + for (int i = 0; i < 2; ++i) { + paths[i] = new Path("/" + GenericTestUtils.getMethodName() + "." + i + ".dat"); + DFSTestUtil.createFile(fs, paths[i], BLOCK_SIZE, (short) 1, seed); + } + + startRollingUpgrade(); + + // Delete the first file. The DN will save its block files in trash. + blockFiles[0] = getBlockForFile(paths[0], true); + File trashFile0 = getTrashFileForBlock(blockFiles[0], false); + deleteAndEnsureInTrash(paths[0], blockFiles[0], trashFile0); + + // Restart the DN with a new layout version to trigger layout upgrade. + LOG.info("Shutting down the Datanode"); + MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0); + DFSTestUtil.addDataNodeLayoutVersion( + DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1, + "Test Layout for TestDataNodeRollingUpgrade"); + LOG.info("Restarting the DataNode"); + cluster.restartDataNode(dnprop, true); + cluster.waitActive(); + + dn0 = cluster.getDataNodes().get(0); + LOG.info("The DN has been restarted"); + assertFalse(trashFile0.exists()); + assertFalse(dn0.getStorage().getBPStorage(blockPoolId).isTrashAllowed(blockFiles[0])); + + // Ensure that the block file for the first file was moved from 'trash' to 'previous'. + assertTrue(isBlockFileInPrevious(blockFiles[0])); + assertFalse(isTrashRootPresent()); + + // Delete the second file. Ensure that its block file is in previous. + blockFiles[1] = getBlockForFile(paths[1], true); + fs.delete(paths[1], false); + assertTrue(isBlockFileInPrevious(blockFiles[1])); + assertFalse(isTrashRootPresent()); + + // Create and delete a third file. Its block file should not be + // in either trash or previous after deletion. + paths[2] = new Path("/" + GenericTestUtils.getMethodName() + ".2.dat"); + DFSTestUtil.createFile(fs, paths[2], BLOCK_SIZE, (short) 1, seed); + blockFiles[2] = getBlockForFile(paths[2], true); + fs.delete(paths[2], false); + assertFalse(isBlockFileInPrevious(blockFiles[2])); + assertFalse(isTrashRootPresent()); + + // Rollback and ensure that the first two file contents were restored. + rollbackRollingUpgrade(); + for (int i = 0; i < 2; ++i) { + byte[] actual = DFSTestUtil.readFileBuffer(fs, paths[i]); + byte[] calculated = DFSTestUtil.calculateFileContentsFromSeed(seed, BLOCK_SIZE); + assertArrayEquals(actual, calculated); + } + + // And none of the block files must be in previous or trash. + assertFalse(isTrashRootPresent()); + for (int i = 0; i < 3; ++i) { + assertFalse(isBlockFileInPrevious(blockFiles[i])); + } + } finally { + shutdownCluster(); + } + } }