Merge branch 'trunk' into HDFS-6581
Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
This commit is contained in:
commit
61ccd50362
|
@ -262,6 +262,8 @@ Trunk (Unreleased)
|
||||||
HDFS-6893. crypto subcommand is not sorted properly in hdfs's hadoop_usage
|
HDFS-6893. crypto subcommand is not sorted properly in hdfs's hadoop_usage
|
||||||
(David Luo via aw)
|
(David Luo via aw)
|
||||||
|
|
||||||
|
HDFS-6981. Fix DN upgrade with layout version change. (Arpit Agarwal)
|
||||||
|
|
||||||
Release 2.6.0 - UNRELEASED
|
Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -629,6 +631,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7005. DFS input streams do not timeout.
|
HDFS-7005. DFS input streams do not timeout.
|
||||||
|
|
||||||
|
HDFS-6951. Correctly persist raw namespace xattrs to edit log and fsimage.
|
||||||
|
(clamb via wang)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||||
|
|
|
@ -468,11 +468,14 @@ class BPOfferService {
|
||||||
* Signal the current rolling upgrade status as indicated by the NN.
|
* Signal the current rolling upgrade status as indicated by the NN.
|
||||||
* @param inProgress true if a rolling upgrade is in progress
|
* @param inProgress true if a rolling upgrade is in progress
|
||||||
*/
|
*/
|
||||||
void signalRollingUpgrade(boolean inProgress) {
|
void signalRollingUpgrade(boolean inProgress) throws IOException {
|
||||||
|
String bpid = getBlockPoolId();
|
||||||
if (inProgress) {
|
if (inProgress) {
|
||||||
dn.getFSDataset().enableTrash(getBlockPoolId());
|
dn.getFSDataset().enableTrash(bpid);
|
||||||
|
dn.getFSDataset().setRollingUpgradeMarker(bpid);
|
||||||
} else {
|
} else {
|
||||||
dn.getFSDataset().restoreTrash(getBlockPoolId());
|
dn.getFSDataset().restoreTrash(bpid);
|
||||||
|
dn.getFSDataset().clearRollingUpgradeMarker(bpid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -627,7 +627,7 @@ class BPServiceActor implements Runnable {
|
||||||
bpos.shutdownActor(this);
|
bpos.shutdownActor(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleRollingUpgradeStatus(HeartbeatResponse resp) {
|
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
|
||||||
RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus();
|
RollingUpgradeStatus rollingUpgradeStatus = resp.getRollingUpdateStatus();
|
||||||
if (rollingUpgradeStatus != null &&
|
if (rollingUpgradeStatus != null &&
|
||||||
rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) {
|
rollingUpgradeStatus.getBlockPoolId().compareTo(bpos.getBlockPoolId()) != 0) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.HardLink;
|
import org.apache.hadoop.fs.HardLink;
|
||||||
|
@ -38,8 +39,11 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -62,6 +66,18 @@ import java.util.regex.Pattern;
|
||||||
public class BlockPoolSliceStorage extends Storage {
|
public class BlockPoolSliceStorage extends Storage {
|
||||||
static final String TRASH_ROOT_DIR = "trash";
|
static final String TRASH_ROOT_DIR = "trash";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A marker file that is created on each root directory if a rolling upgrade
|
||||||
|
* is in progress. The NN does not inform the DN when a rolling upgrade is
|
||||||
|
* finalized. All the DN can infer is whether or not a rolling upgrade is
|
||||||
|
* currently in progress. When the rolling upgrade is not in progress:
|
||||||
|
* 1. If the marker file is present, then a rolling upgrade just completed.
|
||||||
|
* If a 'previous' directory exists, it can be deleted now.
|
||||||
|
* 2. If the marker file is absent, then a regular upgrade may be in
|
||||||
|
* progress. Do not delete the 'previous' directory.
|
||||||
|
*/
|
||||||
|
static final String ROLLING_UPGRADE_MARKER_FILE = "RollingUpgradeInProgress";
|
||||||
|
|
||||||
private static final String BLOCK_POOL_ID_PATTERN_BASE =
|
private static final String BLOCK_POOL_ID_PATTERN_BASE =
|
||||||
Pattern.quote(File.separator) +
|
Pattern.quote(File.separator) +
|
||||||
"BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" +
|
"BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+" +
|
||||||
|
@ -83,6 +99,13 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
blockpoolID = bpid;
|
blockpoolID = bpid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These maps are used as an optimization to avoid one filesystem operation
|
||||||
|
* per storage on each heartbeat response.
|
||||||
|
*/
|
||||||
|
private static Set<String> storagesWithRollingUpgradeMarker;
|
||||||
|
private static Set<String> storagesWithoutRollingUpgradeMarker;
|
||||||
|
|
||||||
BlockPoolSliceStorage(int namespaceID, String bpID, long cTime,
|
BlockPoolSliceStorage(int namespaceID, String bpID, long cTime,
|
||||||
String clusterId) {
|
String clusterId) {
|
||||||
super(NodeType.DATA_NODE);
|
super(NodeType.DATA_NODE);
|
||||||
|
@ -90,10 +113,18 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
this.blockpoolID = bpID;
|
this.blockpoolID = bpID;
|
||||||
this.cTime = cTime;
|
this.cTime = cTime;
|
||||||
this.clusterID = clusterId;
|
this.clusterID = clusterId;
|
||||||
|
storagesWithRollingUpgradeMarker = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
|
storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockPoolSliceStorage() {
|
private BlockPoolSliceStorage() {
|
||||||
super(NodeType.DATA_NODE);
|
super(NodeType.DATA_NODE);
|
||||||
|
storagesWithRollingUpgradeMarker = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
|
storagesWithoutRollingUpgradeMarker = Collections.newSetFromMap(
|
||||||
|
new ConcurrentHashMap<String, Boolean>());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -270,13 +301,9 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
private void doTransition(DataNode datanode, StorageDirectory sd,
|
private void doTransition(DataNode datanode, StorageDirectory sd,
|
||||||
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
||||||
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
|
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
|
||||||
// we will already restore everything in the trash by rolling back to
|
Preconditions.checkState(!getTrashRootDir(sd).exists(),
|
||||||
// the previous directory, so we must delete the trash to ensure
|
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
|
||||||
// that it's not restored by BPOfferService.signalRollingUpgrade()
|
" both be present.");
|
||||||
if (!FileUtil.fullyDelete(getTrashRootDir(sd))) {
|
|
||||||
throw new IOException("Unable to delete trash directory prior to " +
|
|
||||||
"restoration of previous directory: " + getTrashRootDir(sd));
|
|
||||||
}
|
|
||||||
doRollback(sd, nsInfo); // rollback if applicable
|
doRollback(sd, nsInfo); // rollback if applicable
|
||||||
} else {
|
} else {
|
||||||
// Restore all the files in the trash. The restored files are retained
|
// Restore all the files in the trash. The restored files are retained
|
||||||
|
@ -440,10 +467,18 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
final File newChild = new File(restoreDirectory, child.getName());
|
final File newChild = new File(restoreDirectory, child.getName());
|
||||||
if (!child.renameTo(newChild)) {
|
|
||||||
|
if (newChild.exists() && newChild.length() >= child.length()) {
|
||||||
|
// Failsafe - we should not hit this case but let's make sure
|
||||||
|
// we never overwrite a newer version of a block file with an
|
||||||
|
// older version.
|
||||||
|
LOG.info("Not overwriting " + newChild + " with smaller file from " +
|
||||||
|
"trash directory. This message can be safely ignored.");
|
||||||
|
} else if (!child.renameTo(newChild)) {
|
||||||
throw new IOException("Failed to rename " + child + " to " + newChild);
|
throw new IOException("Failed to rename " + child + " to " + newChild);
|
||||||
|
} else {
|
||||||
|
++filesRestored;
|
||||||
}
|
}
|
||||||
++filesRestored;
|
|
||||||
}
|
}
|
||||||
FileUtil.fullyDelete(trashRoot);
|
FileUtil.fullyDelete(trashRoot);
|
||||||
return filesRestored;
|
return filesRestored;
|
||||||
|
@ -599,6 +634,18 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
return new File(sd.getRoot(), TRASH_ROOT_DIR);
|
return new File(sd.getRoot(), TRASH_ROOT_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine whether we can use trash for the given blockFile. Trash
|
||||||
|
* is disallowed if a 'previous' directory exists for the
|
||||||
|
* storage directory containing the block.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean isTrashAllowed(File blockFile) {
|
||||||
|
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
|
||||||
|
String previousDir = matcher.replaceFirst("$1$2" + STORAGE_DIR_PREVIOUS);
|
||||||
|
return !(new File(previousDir)).exists();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a target subdirectory under trash/ for a given block file that is being
|
* Get a target subdirectory under trash/ for a given block file that is being
|
||||||
* deleted.
|
* deleted.
|
||||||
|
@ -609,9 +656,12 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @return the trash directory for a given block file that is being deleted.
|
* @return the trash directory for a given block file that is being deleted.
|
||||||
*/
|
*/
|
||||||
public String getTrashDirectory(File blockFile) {
|
public String getTrashDirectory(File blockFile) {
|
||||||
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
|
if (isTrashAllowed(blockFile)) {
|
||||||
String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
|
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
|
||||||
return trashDirectory;
|
String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
|
||||||
|
return trashDirectory;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -638,6 +688,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
for (StorageDirectory sd : storageDirs) {
|
for (StorageDirectory sd : storageDirs) {
|
||||||
File trashRoot = getTrashRootDir(sd);
|
File trashRoot = getTrashRootDir(sd);
|
||||||
try {
|
try {
|
||||||
|
Preconditions.checkState(!(trashRoot.exists() && sd.getPreviousDir().exists()));
|
||||||
restoreBlockFilesFromTrash(trashRoot);
|
restoreBlockFilesFromTrash(trashRoot);
|
||||||
FileUtil.fullyDelete(getTrashRootDir(sd));
|
FileUtil.fullyDelete(getTrashRootDir(sd));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -656,4 +707,49 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a rolling upgrade marker file for each BP storage root, if it
|
||||||
|
* does not exist already.
|
||||||
|
*/
|
||||||
|
public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
|
||||||
|
throws IOException {
|
||||||
|
for (StorageDirectory sd : dnStorageDirs) {
|
||||||
|
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
|
||||||
|
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
|
||||||
|
if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
|
||||||
|
if (!markerFile.exists() && markerFile.createNewFile()) {
|
||||||
|
LOG.info("Created " + markerFile);
|
||||||
|
} else {
|
||||||
|
LOG.info(markerFile + " already exists.");
|
||||||
|
}
|
||||||
|
storagesWithRollingUpgradeMarker.add(bpRoot.toString());
|
||||||
|
storagesWithoutRollingUpgradeMarker.remove(bpRoot.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether the rolling upgrade marker file exists for each BP storage
|
||||||
|
* root. If it does exist, then the marker file is cleared and more
|
||||||
|
* importantly the layout upgrade is finalized.
|
||||||
|
*/
|
||||||
|
public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
|
||||||
|
throws IOException {
|
||||||
|
for (StorageDirectory sd : dnStorageDirs) {
|
||||||
|
File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
|
||||||
|
File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
|
||||||
|
if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {
|
||||||
|
if (markerFile.exists()) {
|
||||||
|
LOG.info("Deleting " + markerFile);
|
||||||
|
doFinalize(sd.getCurrentDir());
|
||||||
|
if (!markerFile.delete()) {
|
||||||
|
LOG.warn("Failed to delete " + markerFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
storagesWithoutRollingUpgradeMarker.add(bpRoot.toString());
|
||||||
|
storagesWithRollingUpgradeMarker.remove(bpRoot.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,11 +84,17 @@ public class DataStorage extends Storage {
|
||||||
public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
|
public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
|
||||||
public final static String STORAGE_DIR_TMP = "tmp";
|
public final static String STORAGE_DIR_TMP = "tmp";
|
||||||
|
|
||||||
// Set of bpids for which 'trash' is currently enabled.
|
/**
|
||||||
// When trash is enabled block files are moved under a separate
|
* Set of bpids for which 'trash' is currently enabled.
|
||||||
// 'trash' folder instead of being deleted right away. This can
|
* When trash is enabled block files are moved under a separate
|
||||||
// be useful during rolling upgrades, for example.
|
* 'trash' folder instead of being deleted right away. This can
|
||||||
// The set is backed by a concurrent HashMap.
|
* be useful during rolling upgrades, for example.
|
||||||
|
* The set is backed by a concurrent HashMap.
|
||||||
|
*
|
||||||
|
* Even if trash is enabled, it is not used if a layout upgrade
|
||||||
|
* is in progress for a storage directory i.e. if the previous
|
||||||
|
* directory exists.
|
||||||
|
*/
|
||||||
private Set<String> trashEnabledBpids;
|
private Set<String> trashEnabledBpids;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -137,7 +143,9 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enable trash for the specified block pool storage.
|
* Enable trash for the specified block pool storage. Even if trash is
|
||||||
|
* enabled by the caller, it is superseded by the 'previous' directory
|
||||||
|
* if a layout upgrade is in progress.
|
||||||
*/
|
*/
|
||||||
public void enableTrash(String bpid) {
|
public void enableTrash(String bpid) {
|
||||||
if (trashEnabledBpids.add(bpid)) {
|
if (trashEnabledBpids.add(bpid)) {
|
||||||
|
@ -157,6 +165,14 @@ public class DataStorage extends Storage {
|
||||||
return trashEnabledBpids.contains(bpid);
|
return trashEnabledBpids.contains(bpid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setRollingUpgradeMarker(String bpid) throws IOException {
|
||||||
|
getBPStorage(bpid).setRollingUpgradeMarkers(storageDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearRollingUpgradeMarker(String bpid) throws IOException {
|
||||||
|
getBPStorage(bpid).clearRollingUpgradeMarkers(storageDirs);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If rolling upgrades are in progress then do not delete block files
|
* If rolling upgrades are in progress then do not delete block files
|
||||||
* immediately. Instead we move the block files to an intermediate
|
* immediately. Instead we move the block files to an intermediate
|
||||||
|
@ -689,7 +705,8 @@ public class DataStorage extends Storage {
|
||||||
if (DataNodeLayoutVersion.supports(
|
if (DataNodeLayoutVersion.supports(
|
||||||
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
||||||
// The VERSION file is already read in. Override the layoutVersion
|
// The VERSION file is already read in. Override the layoutVersion
|
||||||
// field and overwrite the file.
|
// field and overwrite the file. The upgrade work is handled by
|
||||||
|
// {@link BlockPoolSliceStorage#doUpgrade}
|
||||||
LOG.info("Updating layout version from " + layoutVersion + " to "
|
LOG.info("Updating layout version from " + layoutVersion + " to "
|
||||||
+ HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage "
|
+ HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage "
|
||||||
+ sd.getRoot());
|
+ sd.getRoot());
|
||||||
|
|
|
@ -446,6 +446,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
||||||
*/
|
*/
|
||||||
public boolean trashEnabled(String bpid);
|
public boolean trashEnabled(String bpid);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a marker file indicating that a rolling upgrade is in progress.
|
||||||
|
*/
|
||||||
|
public void setRollingUpgradeMarker(String bpid) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the rolling upgrade marker file if it exists.
|
||||||
|
* @param bpid
|
||||||
|
*/
|
||||||
|
public void clearRollingUpgradeMarker(String bpid) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* submit a sync_file_range request to AsyncDiskService
|
* submit a sync_file_range request to AsyncDiskService
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -2153,6 +2153,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
return dataStorage.trashEnabled(bpid);
|
return dataStorage.trashEnabled(bpid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRollingUpgradeMarker(String bpid) throws IOException {
|
||||||
|
dataStorage.setRollingUpgradeMarker(bpid);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clearRollingUpgradeMarker(String bpid) throws IOException {
|
||||||
|
dataStorage.clearRollingUpgradeMarker(bpid);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RollingLogs createRollingLogs(String bpid, String prefix
|
public RollingLogs createRollingLogs(String bpid, String prefix
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
|
|
@ -111,6 +111,18 @@ public class EncryptionZoneManager {
|
||||||
*/
|
*/
|
||||||
void addEncryptionZone(Long inodeId, String keyName) {
|
void addEncryptionZone(Long inodeId, String keyName) {
|
||||||
assert dir.hasWriteLock();
|
assert dir.hasWriteLock();
|
||||||
|
unprotectedAddEncryptionZone(inodeId, keyName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new encryption zone.
|
||||||
|
* <p/>
|
||||||
|
* Does not assume that the FSDirectory lock is held.
|
||||||
|
*
|
||||||
|
* @param inodeId of the encryption zone
|
||||||
|
* @param keyName encryption zone key name
|
||||||
|
*/
|
||||||
|
void unprotectedAddEncryptionZone(Long inodeId, String keyName) {
|
||||||
final EncryptionZoneInt ez = new EncryptionZoneInt(inodeId, keyName);
|
final EncryptionZoneInt ez = new EncryptionZoneInt(inodeId, keyName);
|
||||||
encryptionZones.put(inodeId, ez);
|
encryptionZones.put(inodeId, ez);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2102,7 +2102,7 @@ public class FSDirectory implements Closeable {
|
||||||
for (XAttr xattr : xattrs) {
|
for (XAttr xattr : xattrs) {
|
||||||
final String xaName = XAttrHelper.getPrefixName(xattr);
|
final String xaName = XAttrHelper.getPrefixName(xattr);
|
||||||
if (CRYPTO_XATTR_ENCRYPTION_ZONE.equals(xaName)) {
|
if (CRYPTO_XATTR_ENCRYPTION_ZONE.equals(xaName)) {
|
||||||
ezManager.addEncryptionZone(inode.getId(),
|
ezManager.unprotectedAddEncryptionZone(inode.getId(),
|
||||||
new String(xattr.getValue()));
|
new String(xattr.getValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,12 @@ public final class FSImageFormatPBINode {
|
||||||
private static final int XATTR_NAMESPACE_OFFSET = 30;
|
private static final int XATTR_NAMESPACE_OFFSET = 30;
|
||||||
private static final int XATTR_NAME_MASK = (1 << 24) - 1;
|
private static final int XATTR_NAME_MASK = (1 << 24) - 1;
|
||||||
private static final int XATTR_NAME_OFFSET = 6;
|
private static final int XATTR_NAME_OFFSET = 6;
|
||||||
private static final XAttr.NameSpace[] XATTR_NAMESPACE_VALUES =
|
|
||||||
|
/* See the comments in fsimage.proto for an explanation of the following. */
|
||||||
|
private static final int XATTR_NAMESPACE_EXT_OFFSET = 5;
|
||||||
|
private static final int XATTR_NAMESPACE_EXT_MASK = 1;
|
||||||
|
|
||||||
|
private static final XAttr.NameSpace[] XATTR_NAMESPACE_VALUES =
|
||||||
XAttr.NameSpace.values();
|
XAttr.NameSpace.values();
|
||||||
|
|
||||||
|
|
||||||
|
@ -122,6 +127,8 @@ public final class FSImageFormatPBINode {
|
||||||
int v = xAttrCompactProto.getName();
|
int v = xAttrCompactProto.getName();
|
||||||
int nid = (v >> XATTR_NAME_OFFSET) & XATTR_NAME_MASK;
|
int nid = (v >> XATTR_NAME_OFFSET) & XATTR_NAME_MASK;
|
||||||
int ns = (v >> XATTR_NAMESPACE_OFFSET) & XATTR_NAMESPACE_MASK;
|
int ns = (v >> XATTR_NAMESPACE_OFFSET) & XATTR_NAMESPACE_MASK;
|
||||||
|
ns |=
|
||||||
|
((v >> XATTR_NAMESPACE_EXT_OFFSET) & XATTR_NAMESPACE_EXT_MASK) << 2;
|
||||||
String name = stringTable[nid];
|
String name = stringTable[nid];
|
||||||
byte[] value = null;
|
byte[] value = null;
|
||||||
if (xAttrCompactProto.getValue() != null) {
|
if (xAttrCompactProto.getValue() != null) {
|
||||||
|
@ -371,10 +378,13 @@ public final class FSImageFormatPBINode {
|
||||||
for (XAttr a : f.getXAttrs()) {
|
for (XAttr a : f.getXAttrs()) {
|
||||||
XAttrCompactProto.Builder xAttrCompactBuilder = XAttrCompactProto.
|
XAttrCompactProto.Builder xAttrCompactBuilder = XAttrCompactProto.
|
||||||
newBuilder();
|
newBuilder();
|
||||||
int v = ((a.getNameSpace().ordinal() & XATTR_NAMESPACE_MASK) <<
|
int nsOrd = a.getNameSpace().ordinal();
|
||||||
XATTR_NAMESPACE_OFFSET)
|
Preconditions.checkArgument(nsOrd < 8, "Too many namespaces.");
|
||||||
| ((stringMap.getId(a.getName()) & XATTR_NAME_MASK) <<
|
int v = ((nsOrd & XATTR_NAMESPACE_MASK) << XATTR_NAMESPACE_OFFSET)
|
||||||
|
| ((stringMap.getId(a.getName()) & XATTR_NAME_MASK) <<
|
||||||
XATTR_NAME_OFFSET);
|
XATTR_NAME_OFFSET);
|
||||||
|
v |= (((nsOrd >> 2) & XATTR_NAMESPACE_EXT_MASK) <<
|
||||||
|
XATTR_NAMESPACE_EXT_OFFSET);
|
||||||
xAttrCompactBuilder.setName(v);
|
xAttrCompactBuilder.setName(v);
|
||||||
if (a.getValue() != null) {
|
if (a.getValue() != null) {
|
||||||
xAttrCompactBuilder.setValue(PBHelper.getByteString(a.getValue()));
|
xAttrCompactBuilder.setValue(PBHelper.getByteString(a.getValue()));
|
||||||
|
|
|
@ -68,10 +68,10 @@ public class NameNodeLayoutVersion {
|
||||||
XATTRS(-57, "Extended attributes"),
|
XATTRS(-57, "Extended attributes"),
|
||||||
CREATE_OVERWRITE(-58, "Use single editlog record for " +
|
CREATE_OVERWRITE(-58, "Use single editlog record for " +
|
||||||
"creating file with overwrite"),
|
"creating file with overwrite"),
|
||||||
LAZY_PERSIST_FILES(-59, "Support for optional lazy persistence of "
|
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
|
||||||
+ " files with reduced durability guarantees");
|
LAZY_PERSIST_FILES(-60, "Support for optional lazy persistence of " +
|
||||||
|
" files with reduced durability guarantees");
|
||||||
|
|
||||||
|
|
||||||
private final FeatureInfo info;
|
private final FeatureInfo info;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -113,8 +113,12 @@ message INodeSection {
|
||||||
*
|
*
|
||||||
* [0:2) -- the namespace of XAttr (XAttrNamespaceProto)
|
* [0:2) -- the namespace of XAttr (XAttrNamespaceProto)
|
||||||
* [2:26) -- the name of the entry, which is an ID that points to a
|
* [2:26) -- the name of the entry, which is an ID that points to a
|
||||||
* string in the StringTableSection.
|
* string in the StringTableSection.
|
||||||
* [26:32) -- reserved for future uses.
|
* [26:27) -- namespace extension. Originally there were only 4 namespaces
|
||||||
|
* so only 2 bits were needed. At that time, this bit was reserved. When a
|
||||||
|
* 5th namespace was created (raw) this bit became used as a 3rd namespace
|
||||||
|
* bit.
|
||||||
|
* [27:32) -- reserved for future uses.
|
||||||
*/
|
*/
|
||||||
required fixed32 name = 1;
|
required fixed32 name = 1;
|
||||||
optional bytes value = 2;
|
optional bytes value = 2;
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
|
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
@ -77,6 +79,8 @@ import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
@ -1419,4 +1423,38 @@ public class DFSTestUtil {
|
||||||
}
|
}
|
||||||
return expectedPrimary.getDatanodeDescriptor();
|
return expectedPrimary.getDatanodeDescriptor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void addDataNodeLayoutVersion(final int lv, final String description)
|
||||||
|
throws NoSuchFieldException, IllegalAccessException {
|
||||||
|
Preconditions.checkState(lv < DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
|
||||||
|
// Override {@link DataNodeLayoutVersion#CURRENT_LAYOUT_VERSION} via reflection.
|
||||||
|
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||||
|
modifiersField.setAccessible(true);
|
||||||
|
Field field = DataNodeLayoutVersion.class.getField("CURRENT_LAYOUT_VERSION");
|
||||||
|
field.setAccessible(true);
|
||||||
|
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
|
||||||
|
field.setInt(null, lv);
|
||||||
|
|
||||||
|
// Override {@link HdfsConstants#DATANODE_LAYOUT_VERSION}
|
||||||
|
field = HdfsConstants.class.getField("DATANODE_LAYOUT_VERSION");
|
||||||
|
field.setAccessible(true);
|
||||||
|
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
|
||||||
|
field.setInt(null, lv);
|
||||||
|
|
||||||
|
// Inject the feature into the FEATURES map.
|
||||||
|
final LayoutVersion.FeatureInfo featureInfo =
|
||||||
|
new LayoutVersion.FeatureInfo(lv, lv + 1, description, false);
|
||||||
|
final LayoutVersion.LayoutFeature feature =
|
||||||
|
new LayoutVersion.LayoutFeature() {
|
||||||
|
@Override
|
||||||
|
public LayoutVersion.FeatureInfo getInfo() {
|
||||||
|
return featureInfo;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Update the FEATURES map with the new layout version.
|
||||||
|
LayoutVersion.updateMap(DataNodeLayoutVersion.FEATURES,
|
||||||
|
new LayoutVersion.LayoutFeature[] { feature });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EncryptionFaultInjector;
|
import org.apache.hadoop.hdfs.server.namenode.EncryptionFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
|
import org.apache.hadoop.hdfs.server.namenode.EncryptionZoneManager;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
@ -314,6 +315,13 @@ public class TestEncryptionZones {
|
||||||
assertNumZones(numZones);
|
assertNumZones(numZones);
|
||||||
assertZonePresent(null, zonePath.toString());
|
assertZonePresent(null, zonePath.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||||
|
fs.saveNamespace();
|
||||||
|
fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
||||||
|
cluster.restartNameNode(true);
|
||||||
|
assertNumZones(numZones);
|
||||||
|
assertZonePresent(null, zone1.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1078,6 +1078,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRollingUpgradeMarker(String bpid) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clearRollingUpgradeMarker(String bpid) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||||
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
@ -28,6 +29,8 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -64,7 +68,7 @@ public class TestDataNodeRollingUpgrade {
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
DistributedFileSystem fs = null;
|
DistributedFileSystem fs = null;
|
||||||
DataNode dn = null;
|
DataNode dn0 = null;
|
||||||
NameNode nn = null;
|
NameNode nn = null;
|
||||||
String blockPoolId = null;
|
String blockPoolId = null;
|
||||||
|
|
||||||
|
@ -76,8 +80,8 @@ public class TestDataNodeRollingUpgrade {
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
nn = cluster.getNameNode(0);
|
nn = cluster.getNameNode(0);
|
||||||
assertNotNull(nn);
|
assertNotNull(nn);
|
||||||
dn = cluster.getDataNodes().get(0);
|
dn0 = cluster.getDataNodes().get(0);
|
||||||
assertNotNull(dn);
|
assertNotNull(dn0);
|
||||||
blockPoolId = cluster.getNameNode(0).getNamesystem().getBlockPoolId();
|
blockPoolId = cluster.getNameNode(0).getNamesystem().getBlockPoolId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +92,7 @@ public class TestDataNodeRollingUpgrade {
|
||||||
}
|
}
|
||||||
fs = null;
|
fs = null;
|
||||||
nn = null;
|
nn = null;
|
||||||
dn = null;
|
dn0 = null;
|
||||||
blockPoolId = null;
|
blockPoolId = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,9 +107,10 @@ public class TestDataNodeRollingUpgrade {
|
||||||
private File getBlockForFile(Path path, boolean exists) throws IOException {
|
private File getBlockForFile(Path path, boolean exists) throws IOException {
|
||||||
LocatedBlocks blocks = nn.getRpcServer().getBlockLocations(path.toString(),
|
LocatedBlocks blocks = nn.getRpcServer().getBlockLocations(path.toString(),
|
||||||
0, Long.MAX_VALUE);
|
0, Long.MAX_VALUE);
|
||||||
assertEquals(1, blocks.getLocatedBlocks().size());
|
assertEquals("The test helper functions assume that each file has a single block",
|
||||||
|
1, blocks.getLocatedBlocks().size());
|
||||||
ExtendedBlock block = blocks.getLocatedBlocks().get(0).getBlock();
|
ExtendedBlock block = blocks.getLocatedBlocks().get(0).getBlock();
|
||||||
BlockLocalPathInfo bInfo = dn.getFSDataset().getBlockLocalPathInfo(block);
|
BlockLocalPathInfo bInfo = dn0.getFSDataset().getBlockLocalPathInfo(block);
|
||||||
File blockFile = new File(bInfo.getBlockPath());
|
File blockFile = new File(bInfo.getBlockPath());
|
||||||
assertEquals(exists, blockFile.exists());
|
assertEquals(exists, blockFile.exists());
|
||||||
return blockFile;
|
return blockFile;
|
||||||
|
@ -113,7 +118,7 @@ public class TestDataNodeRollingUpgrade {
|
||||||
|
|
||||||
private File getTrashFileForBlock(File blockFile, boolean exists) {
|
private File getTrashFileForBlock(File blockFile, boolean exists) {
|
||||||
File trashFile = new File(
|
File trashFile = new File(
|
||||||
dn.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile));
|
dn0.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile));
|
||||||
assertEquals(exists, trashFile.exists());
|
assertEquals(exists, trashFile.exists());
|
||||||
return trashFile;
|
return trashFile;
|
||||||
}
|
}
|
||||||
|
@ -135,11 +140,10 @@ public class TestDataNodeRollingUpgrade {
|
||||||
assertFalse(blockFile.exists());
|
assertFalse(blockFile.exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureTrashDisabled() {
|
private boolean isTrashRootPresent() {
|
||||||
// Trash is disabled; trash root does not exist
|
// Trash is disabled; trash root does not exist
|
||||||
assertFalse(dn.getFSDataset().trashEnabled(blockPoolId));
|
BlockPoolSliceStorage bps = dn0.getStorage().getBPStorage(blockPoolId);
|
||||||
BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId);
|
return bps.trashEnabled();
|
||||||
assertFalse(bps.trashEnabled());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -149,17 +153,25 @@ public class TestDataNodeRollingUpgrade {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
assertTrue(blockFile.exists());
|
assertTrue(blockFile.exists());
|
||||||
assertFalse(trashFile.exists());
|
assertFalse(trashFile.exists());
|
||||||
ensureTrashDisabled();
|
assertFalse(isTrashRootPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isBlockFileInPrevious(File blockFile) {
|
||||||
|
Pattern blockFilePattern = Pattern.compile("^(.*/current/.*/)(current)(/.*)$");
|
||||||
|
Matcher matcher = blockFilePattern.matcher(blockFile.toString());
|
||||||
|
String previousFileName = matcher.replaceFirst("$1" + "previous" + "$3");
|
||||||
|
return ((new File(previousFileName)).exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startRollingUpgrade() throws Exception {
|
private void startRollingUpgrade() throws Exception {
|
||||||
LOG.info("Starting rolling upgrade");
|
LOG.info("Starting rolling upgrade");
|
||||||
|
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||||
final DFSAdmin dfsadmin = new DFSAdmin(conf);
|
final DFSAdmin dfsadmin = new DFSAdmin(conf);
|
||||||
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare");
|
TestRollingUpgrade.runCmd(dfsadmin, true, "-rollingUpgrade", "prepare");
|
||||||
triggerHeartBeats();
|
triggerHeartBeats();
|
||||||
|
|
||||||
// Ensure datanode rolling upgrade is started
|
// Ensure datanode rolling upgrade is started
|
||||||
assertTrue(dn.getFSDataset().trashEnabled(blockPoolId));
|
assertTrue(dn0.getFSDataset().trashEnabled(blockPoolId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void finalizeRollingUpgrade() throws Exception {
|
private void finalizeRollingUpgrade() throws Exception {
|
||||||
|
@ -169,8 +181,8 @@ public class TestDataNodeRollingUpgrade {
|
||||||
triggerHeartBeats();
|
triggerHeartBeats();
|
||||||
|
|
||||||
// Ensure datanode rolling upgrade is started
|
// Ensure datanode rolling upgrade is started
|
||||||
assertFalse(dn.getFSDataset().trashEnabled(blockPoolId));
|
assertFalse(dn0.getFSDataset().trashEnabled(blockPoolId));
|
||||||
BlockPoolSliceStorage bps = dn.getStorage().getBPStorage(blockPoolId);
|
BlockPoolSliceStorage bps = dn0.getStorage().getBPStorage(blockPoolId);
|
||||||
assertFalse(bps.trashEnabled());
|
assertFalse(bps.trashEnabled());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,13 +191,15 @@ public class TestDataNodeRollingUpgrade {
|
||||||
// Restart the namenode with rolling upgrade rollback
|
// Restart the namenode with rolling upgrade rollback
|
||||||
LOG.info("Starting rollback of the rolling upgrade");
|
LOG.info("Starting rollback of the rolling upgrade");
|
||||||
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
|
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
|
||||||
|
dnprop.setDnArgs("-rollback");
|
||||||
cluster.shutdownNameNodes();
|
cluster.shutdownNameNodes();
|
||||||
cluster.restartNameNode("-rollingupgrade", "rollback");
|
cluster.restartNameNode("-rollingupgrade", "rollback");
|
||||||
cluster.restartDataNode(dnprop);
|
cluster.restartDataNode(dnprop);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
nn = cluster.getNameNode(0);
|
nn = cluster.getNameNode(0);
|
||||||
dn = cluster.getDataNodes().get(0);
|
dn0 = cluster.getDataNodes().get(0);
|
||||||
triggerHeartBeats();
|
triggerHeartBeats();
|
||||||
|
LOG.info("The cluster is active after rollback");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout=600000)
|
@Test (timeout=600000)
|
||||||
|
@ -194,12 +208,11 @@ public class TestDataNodeRollingUpgrade {
|
||||||
startCluster();
|
startCluster();
|
||||||
|
|
||||||
// Create files in DFS.
|
// Create files in DFS.
|
||||||
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat");
|
Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
|
||||||
Path testFile2 = new Path("/TestDataNodeRollingUpgrade2.dat");
|
Path testFile2 = new Path("/" + GenericTestUtils.getMethodName() + ".02.dat");
|
||||||
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
|
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
|
||||||
DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED);
|
DFSTestUtil.createFile(fs, testFile2, FILE_SIZE, REPL_FACTOR, SEED);
|
||||||
|
|
||||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
||||||
startRollingUpgrade();
|
startRollingUpgrade();
|
||||||
File blockFile = getBlockForFile(testFile2, true);
|
File blockFile = getBlockForFile(testFile2, true);
|
||||||
File trashFile = getTrashFileForBlock(blockFile, false);
|
File trashFile = getTrashFileForBlock(blockFile, false);
|
||||||
|
@ -207,7 +220,7 @@ public class TestDataNodeRollingUpgrade {
|
||||||
finalizeRollingUpgrade();
|
finalizeRollingUpgrade();
|
||||||
|
|
||||||
// Ensure that delete file testFile2 stays deleted after finalize
|
// Ensure that delete file testFile2 stays deleted after finalize
|
||||||
ensureTrashDisabled();
|
assertFalse(isTrashRootPresent());
|
||||||
assert(!fs.exists(testFile2));
|
assert(!fs.exists(testFile2));
|
||||||
assert(fs.exists(testFile1));
|
assert(fs.exists(testFile1));
|
||||||
|
|
||||||
|
@ -222,11 +235,10 @@ public class TestDataNodeRollingUpgrade {
|
||||||
startCluster();
|
startCluster();
|
||||||
|
|
||||||
// Create files in DFS.
|
// Create files in DFS.
|
||||||
Path testFile1 = new Path("/TestDataNodeRollingUpgrade1.dat");
|
Path testFile1 = new Path("/" + GenericTestUtils.getMethodName() + ".01.dat");
|
||||||
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
|
DFSTestUtil.createFile(fs, testFile1, FILE_SIZE, REPL_FACTOR, SEED);
|
||||||
String fileContents1 = DFSTestUtil.readFile(fs, testFile1);
|
String fileContents1 = DFSTestUtil.readFile(fs, testFile1);
|
||||||
|
|
||||||
fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
||||||
startRollingUpgrade();
|
startRollingUpgrade();
|
||||||
|
|
||||||
File blockFile = getBlockForFile(testFile1, true);
|
File blockFile = getBlockForFile(testFile1, true);
|
||||||
|
@ -255,9 +267,9 @@ public class TestDataNodeRollingUpgrade {
|
||||||
startCluster();
|
startCluster();
|
||||||
|
|
||||||
// Create files in DFS.
|
// Create files in DFS.
|
||||||
String testFile1 = "/TestDataNodeXceiver1.dat";
|
String testFile1 = "/" + GenericTestUtils.getMethodName() + ".01.dat";
|
||||||
String testFile2 = "/TestDataNodeXceiver2.dat";
|
String testFile2 = "/" + GenericTestUtils.getMethodName() + ".02.dat";
|
||||||
String testFile3 = "/TestDataNodeXceiver3.dat";
|
String testFile3 = "/" + GenericTestUtils.getMethodName() + ".03.dat";
|
||||||
|
|
||||||
DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
|
DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
|
||||||
DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
|
DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
|
||||||
|
@ -277,12 +289,12 @@ public class TestDataNodeRollingUpgrade {
|
||||||
s3.write(toWrite, 0, 1024*1024*8);
|
s3.write(toWrite, 0, 1024*1024*8);
|
||||||
s3.flush();
|
s3.flush();
|
||||||
|
|
||||||
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
|
assertTrue(dn0.getXferServer().getNumPeersXceiver() == dn0.getXferServer()
|
||||||
.getNumPeersXceiver());
|
.getNumPeersXceiver());
|
||||||
s1.close();
|
s1.close();
|
||||||
s2.close();
|
s2.close();
|
||||||
s3.close();
|
s3.close();
|
||||||
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
|
assertTrue(dn0.getXferServer().getNumPeersXceiver() == dn0.getXferServer()
|
||||||
.getNumPeersXceiver());
|
.getNumPeersXceiver());
|
||||||
client1.close();
|
client1.close();
|
||||||
client2.close();
|
client2.close();
|
||||||
|
@ -291,4 +303,143 @@ public class TestDataNodeRollingUpgrade {
|
||||||
shutdownCluster();
|
shutdownCluster();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Support for layout version change with rolling upgrade was
|
||||||
|
* added by HDFS-6800 and HDFS-6981.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testWithLayoutChangeAndFinalize() throws Exception {
|
||||||
|
final long seed = 0x600DF00D;
|
||||||
|
try {
|
||||||
|
startCluster();
|
||||||
|
|
||||||
|
Path[] paths = new Path[3];
|
||||||
|
File[] blockFiles = new File[3];
|
||||||
|
|
||||||
|
// Create two files in DFS.
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
paths[i] = new Path("/" + GenericTestUtils.getMethodName() + "." + i + ".dat");
|
||||||
|
DFSTestUtil.createFile(fs, paths[i], BLOCK_SIZE, (short) 2, seed);
|
||||||
|
}
|
||||||
|
|
||||||
|
startRollingUpgrade();
|
||||||
|
|
||||||
|
// Delete the first file. The DN will save its block files in trash.
|
||||||
|
blockFiles[0] = getBlockForFile(paths[0], true);
|
||||||
|
File trashFile0 = getTrashFileForBlock(blockFiles[0], false);
|
||||||
|
deleteAndEnsureInTrash(paths[0], blockFiles[0], trashFile0);
|
||||||
|
|
||||||
|
// Restart the DN with a new layout version to trigger layout upgrade.
|
||||||
|
LOG.info("Shutting down the Datanode");
|
||||||
|
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
|
||||||
|
DFSTestUtil.addDataNodeLayoutVersion(
|
||||||
|
DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1,
|
||||||
|
"Test Layout for TestDataNodeRollingUpgrade");
|
||||||
|
LOG.info("Restarting the DataNode");
|
||||||
|
cluster.restartDataNode(dnprop, true);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
dn0 = cluster.getDataNodes().get(0);
|
||||||
|
LOG.info("The DN has been restarted");
|
||||||
|
assertFalse(trashFile0.exists());
|
||||||
|
assertFalse(dn0.getStorage().getBPStorage(blockPoolId).isTrashAllowed(blockFiles[0]));
|
||||||
|
|
||||||
|
// Ensure that the block file for the first file was moved from 'trash' to 'previous'.
|
||||||
|
assertTrue(isBlockFileInPrevious(blockFiles[0]));
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
|
||||||
|
// Delete the second file. Ensure that its block file is in previous.
|
||||||
|
blockFiles[1] = getBlockForFile(paths[1], true);
|
||||||
|
fs.delete(paths[1], false);
|
||||||
|
assertTrue(isBlockFileInPrevious(blockFiles[1]));
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
|
||||||
|
// Rollback and ensure that neither block file exists in trash or previous.
|
||||||
|
finalizeRollingUpgrade();
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
assertFalse(isBlockFileInPrevious(blockFiles[0]));
|
||||||
|
assertFalse(isBlockFileInPrevious(blockFiles[1]));
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Support for layout version change with rolling upgrade was
|
||||||
|
* added by HDFS-6800 and HDFS-6981.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testWithLayoutChangeAndRollback() throws Exception {
|
||||||
|
final long seed = 0x600DF00D;
|
||||||
|
try {
|
||||||
|
startCluster();
|
||||||
|
|
||||||
|
Path[] paths = new Path[3];
|
||||||
|
File[] blockFiles = new File[3];
|
||||||
|
|
||||||
|
// Create two files in DFS.
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
paths[i] = new Path("/" + GenericTestUtils.getMethodName() + "." + i + ".dat");
|
||||||
|
DFSTestUtil.createFile(fs, paths[i], BLOCK_SIZE, (short) 1, seed);
|
||||||
|
}
|
||||||
|
|
||||||
|
startRollingUpgrade();
|
||||||
|
|
||||||
|
// Delete the first file. The DN will save its block files in trash.
|
||||||
|
blockFiles[0] = getBlockForFile(paths[0], true);
|
||||||
|
File trashFile0 = getTrashFileForBlock(blockFiles[0], false);
|
||||||
|
deleteAndEnsureInTrash(paths[0], blockFiles[0], trashFile0);
|
||||||
|
|
||||||
|
// Restart the DN with a new layout version to trigger layout upgrade.
|
||||||
|
LOG.info("Shutting down the Datanode");
|
||||||
|
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(0);
|
||||||
|
DFSTestUtil.addDataNodeLayoutVersion(
|
||||||
|
DataNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1,
|
||||||
|
"Test Layout for TestDataNodeRollingUpgrade");
|
||||||
|
LOG.info("Restarting the DataNode");
|
||||||
|
cluster.restartDataNode(dnprop, true);
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
dn0 = cluster.getDataNodes().get(0);
|
||||||
|
LOG.info("The DN has been restarted");
|
||||||
|
assertFalse(trashFile0.exists());
|
||||||
|
assertFalse(dn0.getStorage().getBPStorage(blockPoolId).isTrashAllowed(blockFiles[0]));
|
||||||
|
|
||||||
|
// Ensure that the block file for the first file was moved from 'trash' to 'previous'.
|
||||||
|
assertTrue(isBlockFileInPrevious(blockFiles[0]));
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
|
||||||
|
// Delete the second file. Ensure that its block file is in previous.
|
||||||
|
blockFiles[1] = getBlockForFile(paths[1], true);
|
||||||
|
fs.delete(paths[1], false);
|
||||||
|
assertTrue(isBlockFileInPrevious(blockFiles[1]));
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
|
||||||
|
// Create and delete a third file. Its block file should not be
|
||||||
|
// in either trash or previous after deletion.
|
||||||
|
paths[2] = new Path("/" + GenericTestUtils.getMethodName() + ".2.dat");
|
||||||
|
DFSTestUtil.createFile(fs, paths[2], BLOCK_SIZE, (short) 1, seed);
|
||||||
|
blockFiles[2] = getBlockForFile(paths[2], true);
|
||||||
|
fs.delete(paths[2], false);
|
||||||
|
assertFalse(isBlockFileInPrevious(blockFiles[2]));
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
|
||||||
|
// Rollback and ensure that the first two file contents were restored.
|
||||||
|
rollbackRollingUpgrade();
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
byte[] actual = DFSTestUtil.readFileBuffer(fs, paths[i]);
|
||||||
|
byte[] calculated = DFSTestUtil.calculateFileContentsFromSeed(seed, BLOCK_SIZE);
|
||||||
|
assertArrayEquals(actual, calculated);
|
||||||
|
}
|
||||||
|
|
||||||
|
// And none of the block files must be in previous or trash.
|
||||||
|
assertFalse(isTrashRootPresent());
|
||||||
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
assertFalse(isBlockFileInPrevious(blockFiles[i]));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests NameNode interaction for all XAttr APIs.
|
* Tests NameNode interaction for all XAttr APIs.
|
||||||
|
@ -129,51 +130,73 @@ public class FSXAttrBaseTest {
|
||||||
*/
|
*/
|
||||||
@Test(timeout = 120000)
|
@Test(timeout = 120000)
|
||||||
public void testCreateXAttr() throws Exception {
|
public void testCreateXAttr() throws Exception {
|
||||||
FileSystem.mkdirs(fs, path, FsPermission.createImmutable((short)0750));
|
Map<String, byte[]> expectedXAttrs = Maps.newHashMap();
|
||||||
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
expectedXAttrs.put(name1, value1);
|
||||||
|
expectedXAttrs.put(name2, null);
|
||||||
|
doTestCreateXAttr(path, expectedXAttrs);
|
||||||
|
expectedXAttrs.put(raw1, value1);
|
||||||
|
doTestCreateXAttr(rawPath, expectedXAttrs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestCreateXAttr(Path usePath, Map<String,
|
||||||
|
byte[]> expectedXAttrs) throws Exception {
|
||||||
|
FileSystem.mkdirs(fs, usePath, FsPermission.createImmutable((short)0750));
|
||||||
|
fs.setXAttr(usePath, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
|
|
||||||
Map<String, byte[]> xattrs = fs.getXAttrs(path);
|
Map<String, byte[]> xattrs = fs.getXAttrs(usePath);
|
||||||
Assert.assertEquals(xattrs.size(), 1);
|
Assert.assertEquals(xattrs.size(), 1);
|
||||||
Assert.assertArrayEquals(value1, xattrs.get(name1));
|
Assert.assertArrayEquals(value1, xattrs.get(name1));
|
||||||
|
|
||||||
fs.removeXAttr(path, name1);
|
fs.removeXAttr(usePath, name1);
|
||||||
|
|
||||||
xattrs = fs.getXAttrs(path);
|
xattrs = fs.getXAttrs(usePath);
|
||||||
Assert.assertEquals(xattrs.size(), 0);
|
Assert.assertEquals(xattrs.size(), 0);
|
||||||
|
|
||||||
// Create xattr which already exists.
|
// Create xattr which already exists.
|
||||||
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
fs.setXAttr(usePath, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
try {
|
try {
|
||||||
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
fs.setXAttr(usePath, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
Assert.fail("Creating xattr which already exists should fail.");
|
Assert.fail("Creating xattr which already exists should fail.");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
}
|
}
|
||||||
fs.removeXAttr(path, name1);
|
fs.removeXAttr(usePath, name1);
|
||||||
|
|
||||||
// Create two xattrs
|
// Create the xattrs
|
||||||
fs.setXAttr(path, name1, value1, EnumSet.of(XAttrSetFlag.CREATE));
|
for (Map.Entry<String, byte[]> ent : expectedXAttrs.entrySet()) {
|
||||||
fs.setXAttr(path, name2, null, EnumSet.of(XAttrSetFlag.CREATE));
|
fs.setXAttr(usePath, ent.getKey(), ent.getValue(),
|
||||||
xattrs = fs.getXAttrs(path);
|
EnumSet.of(XAttrSetFlag.CREATE));
|
||||||
Assert.assertEquals(xattrs.size(), 2);
|
}
|
||||||
Assert.assertArrayEquals(value1, xattrs.get(name1));
|
xattrs = fs.getXAttrs(usePath);
|
||||||
Assert.assertArrayEquals(new byte[0], xattrs.get(name2));
|
Assert.assertEquals(xattrs.size(), expectedXAttrs.size());
|
||||||
|
for (Map.Entry<String, byte[]> ent : expectedXAttrs.entrySet()) {
|
||||||
|
final byte[] val =
|
||||||
|
(ent.getValue() == null) ? new byte[0] : ent.getValue();
|
||||||
|
Assert.assertArrayEquals(val, xattrs.get(ent.getKey()));
|
||||||
|
}
|
||||||
|
|
||||||
restart(false);
|
restart(false);
|
||||||
initFileSystem();
|
initFileSystem();
|
||||||
xattrs = fs.getXAttrs(path);
|
xattrs = fs.getXAttrs(usePath);
|
||||||
Assert.assertEquals(xattrs.size(), 2);
|
Assert.assertEquals(xattrs.size(), expectedXAttrs.size());
|
||||||
Assert.assertArrayEquals(value1, xattrs.get(name1));
|
for (Map.Entry<String, byte[]> ent : expectedXAttrs.entrySet()) {
|
||||||
Assert.assertArrayEquals(new byte[0], xattrs.get(name2));
|
final byte[] val =
|
||||||
|
(ent.getValue() == null) ? new byte[0] : ent.getValue();
|
||||||
|
Assert.assertArrayEquals(val, xattrs.get(ent.getKey()));
|
||||||
|
}
|
||||||
|
|
||||||
restart(true);
|
restart(true);
|
||||||
initFileSystem();
|
initFileSystem();
|
||||||
xattrs = fs.getXAttrs(path);
|
xattrs = fs.getXAttrs(usePath);
|
||||||
Assert.assertEquals(xattrs.size(), 2);
|
Assert.assertEquals(xattrs.size(), expectedXAttrs.size());
|
||||||
Assert.assertArrayEquals(value1, xattrs.get(name1));
|
for (Map.Entry<String, byte[]> ent : expectedXAttrs.entrySet()) {
|
||||||
Assert.assertArrayEquals(new byte[0], xattrs.get(name2));
|
final byte[] val =
|
||||||
|
(ent.getValue() == null) ? new byte[0] : ent.getValue();
|
||||||
fs.removeXAttr(path, name1);
|
Assert.assertArrayEquals(val, xattrs.get(ent.getKey()));
|
||||||
fs.removeXAttr(path, name2);
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<String, byte[]> ent : expectedXAttrs.entrySet()) {
|
||||||
|
fs.removeXAttr(usePath, ent.getKey());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Binary file not shown.
|
@ -1,6 +1,6 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<EDITS>
|
<EDITS>
|
||||||
<EDITS_VERSION>-58</EDITS_VERSION>
|
<EDITS_VERSION>-59</EDITS_VERSION>
|
||||||
<RECORD>
|
<RECORD>
|
||||||
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
|
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
|
||||||
<DATA>
|
<DATA>
|
||||||
|
|
Loading…
Reference in New Issue