HDFS-6969. Archival Storage: INode#getStoragePolicyID should always return the latest storage policy. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2014-09-01 17:56:04 -07:00
parent a26aa6bd07
commit 3e2a0b5446
15 changed files with 210 additions and 81 deletions

View File

@ -278,7 +278,7 @@ public class BlockStoragePolicy {
}
private static byte parseID(String idString, String element, Configuration conf) {
Byte id = null;
byte id = 0;
try {
id = Byte.parseByte(idString);
} catch(NumberFormatException nfe) {

View File

@ -173,6 +173,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
@ -890,9 +891,25 @@ public class PBHelper {
}
builder.addAllTargets(convert(cmd.getTargets()))
.addAllTargetStorageUuids(convert(cmd.getTargetStorageIDs()));
StorageType[][] types = cmd.getTargetStorageTypes();
if (types != null) {
builder.addAllTargetStorageTypes(convert(types));
}
return builder.build();
}
private static List<StorageTypesProto> convert(StorageType[][] types) {
List<StorageTypesProto> list = Lists.newArrayList();
if (types != null) {
for (StorageType[] ts : types) {
StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
builder.addAllStorageTypes(convertStorageTypes(ts));
list.add(builder.build());
}
}
return list;
}
public static BlockIdCommandProto convert(BlockIdCommand cmd) {
BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder()
.setBlockPoolId(cmd.getBlockPoolId());
@ -1021,7 +1038,7 @@ public class PBHelper {
} else {
for(int i = 0; i < targetStorageTypes.length; i++) {
List<StorageTypeProto> p = targetStorageTypesList.get(i).getStorageTypesList();
targetStorageTypes[i] = p.toArray(new StorageType[p.size()]);
targetStorageTypes[i] = convertStorageTypes(p, targets[i].length);
}
}

View File

@ -1343,6 +1343,11 @@ public class FSDirectory implements Closeable {
}
}
private byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
return inodePolicy != BlockStoragePolicy.ID_UNSPECIFIED ? inodePolicy :
parentPolicy;
}
/**
* Get a partial listing of the indicated directory
*
@ -1367,14 +1372,13 @@ public class FSDirectory implements Closeable {
if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR)) {
return getSnapshotsListing(srcs, startAfter);
}
final INodesInPath inodesInPath = getINodesInPath(srcs, true);
final INodesInPath inodesInPath = getLastINodeInPath(srcs);
final int snapshot = inodesInPath.getPathSnapshotId();
final INode[] inodes = inodesInPath.getINodes();
final INode targetNode = inodes[inodes.length - 1];
byte parentStoragePolicy = isSuperUser ? getStoragePolicy(inodes,
snapshot) : BlockStoragePolicy.ID_UNSPECIFIED;
final INode targetNode = inodesInPath.getLastINode();
if (targetNode == null)
return null;
byte parentStoragePolicy = isSuperUser ?
targetNode.getStoragePolicyID() : BlockStoragePolicy.ID_UNSPECIFIED;
if (!targetNode.isDirectory()) {
return new DirectoryListing(
@ -1393,11 +1397,11 @@ public class FSDirectory implements Closeable {
HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
for (int i=0; i<numOfListing && locationBudget>0; i++) {
INode cur = contents.get(startChild+i);
byte curPolicy = cur.getStoragePolicyID(snapshot);
byte curPolicy = isSuperUser ? cur.getLocalStoragePolicyID() :
BlockStoragePolicy.ID_UNSPECIFIED;
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation,
curPolicy != BlockStoragePolicy.ID_UNSPECIFIED ?
curPolicy : parentStoragePolicy,
snapshot, isRawPath);
getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot,
isRawPath);
listingCnt++;
if (needLocation) {
// Once we hit lsLimit locations, stop.
@ -2364,16 +2368,6 @@ public class FSDirectory implements Closeable {
storagePolicy);
}
private byte getStoragePolicy(INode[] inodes, int snapshotId) {
for (int i = inodes.length - 1; i >= 0; i--) {
byte policy = inodes[i].getStoragePolicyID(snapshotId);
if (policy != BlockStoragePolicy.ID_UNSPECIFIED) {
return policy;
}
}
return BlockStoragePolicy.ID_UNSPECIFIED;
}
/**
* Create FileStatus with location info by file INode
*/

View File

@ -393,7 +393,7 @@ public final class FSImageFormatPBINode {
.setPermission(buildPermissionStatus(file, state.getStringMap()))
.setPreferredBlockSize(file.getPreferredBlockSize())
.setReplication(file.getFileReplication())
.setStoragePolicyID(file.getStoragePolicyID());
.setStoragePolicyID(file.getLocalStoragePolicyID());
AclFeature f = file.getAclFeature();
if (f != null) {

View File

@ -106,7 +106,6 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URI;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -120,7 +119,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@ -182,7 +180,6 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -4207,7 +4204,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* Get the content summary for a specific file/dir.
*
* @param src The string representation of the path to the file
* @param srcArg The string representation of the path to the file
*
* @throws AccessControlException if access is denied
* @throws UnresolvedLinkException if a symlink is encountered.
@ -4944,12 +4941,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* Add the given symbolic link to the fs. Record it in the edits log.
* @param path
* @param target
* @param dirPerms
* @param createParent
* @param logRetryCache
* @param dir
*/
private INodeSymlink addSymlink(String path, String target,
PermissionStatus dirPerms,

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@ -685,13 +686,19 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
}
/**
* @return the storage policy id of the inode
* @return the latest block storage policy id of the INode. Specifically,
* if a storage policy is directly specified on the INode then return the ID
* of that policy. Otherwise follow the latest parental path and return the
* ID of the first specified storage policy.
*/
public abstract byte getStoragePolicyID(int snapshotId);
public abstract byte getStoragePolicyID();
public byte getStoragePolicyID() {
return getStoragePolicyID(Snapshot.CURRENT_STATE_ID);
}
/**
* @return the storage policy directly specified on the INode. Return
* {@link BlockStoragePolicy#ID_UNSPECIFIED} if no policy has
* been specified.
*/
public abstract byte getLocalStoragePolicyID();
/**
* Breaks {@code path} into components.

View File

@ -61,9 +61,6 @@ public interface INodeAttributes {
/** @return the access time. */
public long getAccessTime();
/** @return the storage policy ID */
public byte getStoragePolicyID();
/** A read-only copy of the inode attributes. */
public static abstract class SnapshotCopy implements INodeAttributes {
private final byte[] name;

View File

@ -107,10 +107,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
}
@Override
public byte getStoragePolicyID(int snapshotId) {
if (snapshotId != Snapshot.CURRENT_STATE_ID) {
return getSnapshotINode(snapshotId).getStoragePolicyID();
}
public byte getLocalStoragePolicyID() {
XAttrFeature f = getXAttrFeature();
ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
.getXAttrs();
@ -122,6 +119,17 @@ public class INodeDirectory extends INodeWithAdditionalFields
return BlockStoragePolicy.ID_UNSPECIFIED;
}
@Override
public byte getStoragePolicyID() {
byte id = getLocalStoragePolicyID();
if (id != BlockStoragePolicy.ID_UNSPECIFIED) {
return id;
}
// if it is unspecified, check its parent
return getParent() != null ? getParent().getStoragePolicyID() :
BlockStoragePolicy.ID_UNSPECIFIED;
}
void setQuota(long nsQuota, long dsQuota) {
DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
if (quota != null) {

View File

@ -60,19 +60,6 @@ public interface INodeDirectoryAttributes extends INodeAttributes {
&& getAclFeature() == other.getAclFeature()
&& getXAttrFeature() == other.getXAttrFeature();
}
@Override
public byte getStoragePolicyID() {
XAttrFeature f = getXAttrFeature();
ImmutableList<XAttr> xattrs = f == null ? ImmutableList.<XAttr> of() : f
.getXAttrs();
for (XAttr xattr : xattrs) {
if (BlockStoragePolicy.isStoragePolicyXAttr(xattr)) {
return (xattr.getValue())[0];
}
}
return BlockStoragePolicy.ID_UNSPECIFIED;
}
}
public static class CopyWithQuota extends INodeDirectoryAttributes.SnapshotCopy {

View File

@ -172,7 +172,6 @@ public class INodeFile extends INodeWithAdditionalFields
return getFileUnderConstructionFeature() != null;
}
/** Convert this file to an {@link INodeFileUnderConstruction}. */
INodeFile toUnderConstruction(String clientName, String clientMachine) {
Preconditions.checkState(!isUnderConstruction(),
"file is already under construction");
@ -368,16 +367,18 @@ public class INodeFile extends INodeWithAdditionalFields
}
@Override
public byte getStoragePolicyID(int snapshotId) {
if (snapshotId != Snapshot.CURRENT_STATE_ID) {
return getSnapshotINode(snapshotId).getStoragePolicyID();
}
return getStoragePolicyID();
public byte getLocalStoragePolicyID() {
return HeaderFormat.getStoragePolicyID(header);
}
@Override
public byte getStoragePolicyID() {
return HeaderFormat.getStoragePolicyID(header);
byte id = getLocalStoragePolicyID();
if (id == BlockStoragePolicy.ID_UNSPECIFIED) {
return this.getParent() != null ?
this.getParent().getStoragePolicyID() : id;
}
return id;
}
private void setStoragePolicyID(byte storagePolicyId) {

View File

@ -38,6 +38,8 @@ public interface INodeFileAttributes extends INodeAttributes {
public boolean metadataEquals(INodeFileAttributes other);
public byte getLocalStoragePolicyID();
/** A copy of the inode file attributes */
public static class SnapshotCopy extends INodeAttributes.SnapshotCopy
implements INodeFileAttributes {
@ -68,7 +70,7 @@ public interface INodeFileAttributes extends INodeAttributes {
}
@Override
public byte getStoragePolicyID() {
public byte getLocalStoragePolicyID() {
return HeaderFormat.getStoragePolicyID(header);
}

View File

@ -124,7 +124,12 @@ public class INodeMap {
}
@Override
public byte getStoragePolicyID(int snapshotId) {
public byte getStoragePolicyID(){
return BlockStoragePolicy.ID_UNSPECIFIED;
}
@Override
public byte getLocalStoragePolicyID() {
return BlockStoragePolicy.ID_UNSPECIFIED;
}
};

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
import com.google.common.base.Preconditions;
@ -287,8 +286,13 @@ public abstract class INodeReference extends INode {
}
@Override
public final byte getStoragePolicyID(int snapshotId) {
return referred.getStoragePolicyID(snapshotId);
public final byte getStoragePolicyID() {
return referred.getStoragePolicyID();
}
@Override
public final byte getLocalStoragePolicyID() {
return referred.getLocalStoragePolicyID();
}
@Override

View File

@ -147,7 +147,13 @@ public class INodeSymlink extends INodeWithAdditionalFields {
}
@Override
public byte getStoragePolicyID(int snapshotId) {
public byte getStoragePolicyID() {
throw new UnsupportedOperationException(
"Storage policy are not supported on symlinks");
}
@Override
public byte getLocalStoragePolicyID() {
throw new UnsupportedOperationException(
"Storage policy are not supported on symlinks");
}

View File

@ -26,11 +26,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
@ -45,6 +47,8 @@ public class TestBlockStoragePolicy {
static {
conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
POLICY_SUITE = BlockStoragePolicy.readBlockStorageSuite(conf);
DEFAULT_STORAGE_POLICY = POLICY_SUITE.getDefaultPolicy();
}
@ -872,11 +876,12 @@ public class TestBlockStoragePolicy {
HdfsFileStatus.EMPTY_NAME).getPartialListing();
checkDirectoryListing(fooList, COLD, WARM);
// check the policy for /dir/.snapshot/s1/foo/f1
// check the policy for /dir/.snapshot/s1/foo/f1. Note we always return
// the latest storage policy for a file/directory.
Path s1f1 = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo/f1");
DirectoryListing f1Listing = fs.getClient().listPaths(s1f1.toString(),
HdfsFileStatus.EMPTY_NAME);
checkDirectoryListing(f1Listing.getPartialListing(), WARM);
checkDirectoryListing(f1Listing.getPartialListing(), COLD);
// delete f1
fs.delete(fooFile1, true);
@ -885,7 +890,7 @@ public class TestBlockStoragePolicy {
checkDirectoryListing(fooList, WARM);
// check the policy for /dir/.snapshot/s1/foo/f1 again after the deletion
checkDirectoryListing(fs.getClient().listPaths(s1f1.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD);
// change the storage policy of foo dir
fs.setStoragePolicy(fooDir, "HOT");
@ -902,21 +907,126 @@ public class TestBlockStoragePolicy {
Path s1 = SnapshotTestHelper.getSnapshotRoot(dir, "s1");
Path s1foo = SnapshotTestHelper.getSnapshotPath(dir, "s1", "foo");
checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
// /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 are warm
HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT);
// /dir/.snapshot/.s1/foo/f1 and /dir/.snapshot/.s1/foo/f2 should still
// follow the latest
checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM, WARM);
HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
// delete foo
fs.delete(fooDir, true);
checkDirectoryListing(fs.getClient().listPaths(s1.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM);
HdfsFileStatus.EMPTY_NAME).getPartialListing(), HOT);
checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), WARM, WARM);
HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static StorageType[][] genStorageTypes(int numDataNodes) {
StorageType[][] types = new StorageType[numDataNodes][];
for (int i = 0; i < types.length; i++) {
types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
}
return types;
}
private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
int replicaNum, StorageType... types) {
List<StorageType> typeList = Lists.newArrayList();
for (StorageType type : types) {
typeList.add(type);
}
LocatedBlocks lbs = status.getBlockLocations();
Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size());
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
Assert.assertEquals(replicaNum, lb.getStorageTypes().length);
for (StorageType type : lb.getStorageTypes()) {
Assert.assertTrue(typeList.remove(type));
}
}
Assert.assertTrue(typeList.isEmpty());
}
private void testIncreaseFileRep(String policyName, byte policyId,
StorageType[] before,
StorageType[] after) throws Exception {
final int numDataNodes = 5;
final StorageType[][] types = genStorageTypes(numDataNodes);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes).storageTypes(types).build();
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
try {
final Path dir = new Path("/test");
fs.mkdirs(dir);
fs.setStoragePolicy(dir, policyName);
final Path foo = new Path(dir, "foo");
DFSTestUtil.createFile(fs, foo, FILE_LEN, REPLICATION, 0L);
// the storage policy of foo should be WARM, and the replicas
// should be stored in DISK and ARCHIE
HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
HdfsLocatedFileStatus fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, 3, before);
// change the replication factor to 5
fs.setReplication(foo, (short) numDataNodes);
Thread.sleep(1000);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
Thread.sleep(1000);
status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, 5, after);
} finally {
cluster.shutdown();
}
}
/**
* Consider a File with Hot storage policy. Increase replication factor of
* that file from 3 to 5. Make sure all replications are created in DISKS.
*/
@Test
public void testIncreaseHotFileRep() throws Exception {
testIncreaseFileRep("HOT", HOT, new StorageType[]{StorageType.DISK,
StorageType.DISK, StorageType.DISK},
new StorageType[]{StorageType.DISK, StorageType.DISK,
StorageType.DISK, StorageType.DISK, StorageType.DISK});
}
/**
* Consider a File with Warm temperature. Increase replication factor of
* that file from 3 to 5. Make sure all replicas are created in DISKS
* and ARCHIVE.
*/
@Test
public void testIncreaseWarmRep() throws Exception {
testIncreaseFileRep("WARM", WARM, new StorageType[]{StorageType.DISK,
StorageType.ARCHIVE, StorageType.ARCHIVE},
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
}
/**
* Consider a File with Cold temperature. Increase replication factor of
* that file from 3 to 5. Make sure all replicas are created in ARCHIVE.
*/
@Test
public void testIncreaseColdRep() throws Exception {
testIncreaseFileRep("COLD", COLD, new StorageType[]{StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE},
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
}
}