Revert "HDFS-12681. Fold HdfsLocatedFileStatus into HdfsFileStatus."

This reverts commit b85603e3f8.
This commit is contained in:
Chris Douglas 2017-11-15 19:17:46 -08:00
parent b1941b200d
commit 675e9a8f57
12 changed files with 217 additions and 151 deletions

View File

@ -136,15 +136,6 @@ public class LocatedFileStatus extends FileStatus {
return locations;
}
/**
* Hook for subclasses to lazily set block locations. The {@link #locations}
* field should be null before this is called.
* @param locations Block locations for this instance.
*/
protected void setBlockLocations(BlockLocation[] locations) {
this.locations = locations;
}
/**
* Compare this FileStatus to another FileStatus
* @param o the FileStatus to be compared.

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
@ -187,7 +188,8 @@ public class Hdfs extends AbstractFileSystem {
@Override
public LocatedFileStatus next() throws IOException {
return getNext().makeQualifiedLocated(getUri(), p);
return ((HdfsLocatedFileStatus)getNext()).makeQualifiedLocated(
getUri(), p);
}
};
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@ -1211,7 +1212,8 @@ public class DistributedFileSystem extends FileSystem {
T next;
HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
if (needLocation) {
next = (T)fileStat.makeQualifiedLocated(getUri(), p);
next = (T)((HdfsLocatedFileStatus)fileStat)
.makeQualifiedLocated(getUri(), p);
} else {
next = (T)fileStat.makeQualified(getUri(), p);
}

View File

@ -26,17 +26,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
/**
* HDFS metadata for an entity in the filesystem.
/** Interface that represents the over the wire information for a file.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class HdfsFileStatus extends LocatedFileStatus {
public class HdfsFileStatus extends FileStatus {
private static final long serialVersionUID = 0x126eb82a;
@ -51,12 +49,11 @@ public final class HdfsFileStatus extends LocatedFileStatus {
private final int childrenNum;
private final byte storagePolicy;
// BlockLocations[] is the user-facing type
private transient LocatedBlocks hdfsloc;
public static final byte[] EMPTY_NAME = new byte[0];
/** Set of features potentially active on an instance. */
/**
* Set of features potentially active on an instance.
*/
public enum Flags {
HAS_ACL,
HAS_CRYPT,
@ -84,19 +81,18 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* @param storagePolicy ID which specifies storage policy
* @param ecPolicy the erasure coding policy
*/
private HdfsFileStatus(long length, boolean isdir, int replication,
protected HdfsFileStatus(long length, boolean isdir, int replication,
long blocksize, long mtime, long atime,
FsPermission permission, EnumSet<Flags> flags,
String owner, String group,
byte[] symlink, byte[] path, long fileId,
int childrenNum, FileEncryptionInfo feInfo,
byte storagePolicy, ErasureCodingPolicy ecPolicy,
LocatedBlocks hdfsloc) {
byte storagePolicy, ErasureCodingPolicy ecPolicy) {
super(length, isdir, replication, blocksize, mtime,
atime, convert(isdir, symlink != null, permission, flags),
owner, group, null, null,
flags.contains(Flags.HAS_ACL), flags.contains(Flags.HAS_CRYPT),
flags.contains(Flags.HAS_EC), null);
flags.contains(Flags.HAS_EC));
this.flags = flags;
this.uSymlink = symlink;
this.uPath = path;
@ -105,7 +101,6 @@ public final class HdfsFileStatus extends LocatedFileStatus {
this.feInfo = feInfo;
this.storagePolicy = storagePolicy;
this.ecPolicy = ecPolicy;
this.hdfsloc = hdfsloc;
}
/**
@ -157,7 +152,7 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* Check if the local name is empty.
* @return true if the name is empty
*/
public boolean isEmptyLocalName() {
public final boolean isEmptyLocalName() {
return uPath.length == 0;
}
@ -165,7 +160,7 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* Get the string representation of the local name.
* @return the local name in string
*/
public String getLocalName() {
public final String getLocalName() {
return DFSUtilClient.bytes2String(uPath);
}
@ -173,7 +168,7 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* Get the Java UTF8 representation of the local name.
* @return the local name in java UTF8
*/
public byte[] getLocalNameInBytes() {
public final byte[] getLocalNameInBytes() {
return uPath;
}
@ -182,7 +177,7 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* @param parent the parent path
* @return the full path in string
*/
public String getFullName(String parent) {
public final String getFullName(final String parent) {
if (isEmptyLocalName()) {
return parent;
}
@ -200,7 +195,7 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* @param parent the parent path
* @return the full path
*/
public Path getFullPath(Path parent) {
public final Path getFullPath(final Path parent) {
if (isEmptyLocalName()) {
return parent;
}
@ -224,15 +219,15 @@ public final class HdfsFileStatus extends LocatedFileStatus {
/**
* Opaque referant for the symlink, to be resolved at the client.
*/
public byte[] getSymlinkInBytes() {
public final byte[] getSymlinkInBytes() {
return uSymlink;
}
public long getFileId() {
public final long getFileId() {
return fileId;
}
public FileEncryptionInfo getFileEncryptionInfo() {
public final FileEncryptionInfo getFileEncryptionInfo() {
return feInfo;
}
@ -244,12 +239,12 @@ public final class HdfsFileStatus extends LocatedFileStatus {
return ecPolicy;
}
public int getChildrenNum() {
public final int getChildrenNum() {
return childrenNum;
}
/** @return the storage policy id */
public byte getStoragePolicy() {
public final byte getStoragePolicy() {
return storagePolicy;
}
@ -262,10 +257,6 @@ public final class HdfsFileStatus extends LocatedFileStatus {
return flags.contains(Flags.SNAPSHOT_ENABLED);
}
public LocatedBlocks getLocatedBlocks() {
return hdfsloc;
}
@Override
public boolean equals(Object o) {
// satisfy findbugs
@ -286,30 +277,11 @@ public final class HdfsFileStatus extends LocatedFileStatus {
* @param parent Parent path of this element.
* @return Reference to this instance.
*/
public FileStatus makeQualified(URI defaultUri, Path parent) {
public final FileStatus makeQualified(URI defaultUri, Path parent) {
// fully-qualify path
setPath(getFullPath(parent).makeQualified(defaultUri, null));
return this; // API compatibility
}
/**
* This function is used to transform the underlying HDFS LocatedBlocks to
* BlockLocations. This method must be invoked before
* {@link #getBlockLocations()}.
*
* The returned BlockLocation will have different formats for replicated
* and erasure coded file.
* Please refer to
* {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations
* (FileStatus, long, long)}
* for examples.
*/
public LocatedFileStatus makeQualifiedLocated(URI defaultUri,
Path path) {
makeQualified(defaultUri, path);
setBlockLocations(
DFSUtilClient.locatedBlocks2Locations(getLocatedBlocks()));
return this;
}
/**
@ -339,7 +311,6 @@ public final class HdfsFileStatus extends LocatedFileStatus {
private byte storagePolicy =
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
private ErasureCodingPolicy ecPolicy = null;
private LocatedBlocks locations = null;
/**
* Set the length of the entity (default = 0).
@ -518,24 +489,13 @@ public final class HdfsFileStatus extends LocatedFileStatus {
return this;
}
/**
* Set the block locations for this entity (default = null).
* @param locations HDFS locations
* (see {@link #makeQualifiedLocated(URI, Path)})
* @return This Builder instance
*/
public Builder locations(LocatedBlocks locations) {
this.locations = locations;
return this;
}
/**
* @return An {@link HdfsFileStatus} instance from these parameters.
*/
public HdfsFileStatus build() {
return new HdfsFileStatus(length, isdir, replication, blocksize,
mtime, atime, permission, flags, owner, group, symlink, path, fileId,
childrenNum, feInfo, storagePolicy, ecPolicy, locations);
childrenNum, feInfo, storagePolicy, ecPolicy);
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.net.URI;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
/**
* Interface that represents the over the wire information
* including block locations for a file.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class HdfsLocatedFileStatus extends HdfsFileStatus {
private static final long serialVersionUID = 0x23c73328;
/**
* Left transient, because {@link #makeQualifiedLocated(URI,Path)}
* is the user-facing type.
*/
private transient LocatedBlocks locations;
/**
* Constructor
*
* @param length size
* @param isdir if this is directory
* @param block_replication the file's replication factor
* @param blocksize the file's block size
* @param modification_time most recent modification time
* @param access_time most recent access time
* @param permission permission
* @param owner owner
* @param group group
* @param symlink symbolic link
* @param path local path name in java UTF8 format
* @param fileId the file id
* @param locations block locations
* @param feInfo file encryption info
*/
public HdfsLocatedFileStatus(long length, boolean isdir,
int block_replication, long blocksize, long modification_time,
long access_time, FsPermission permission, EnumSet<Flags> flags,
String owner, String group, byte[] symlink, byte[] path, long fileId,
LocatedBlocks locations, int childrenNum, FileEncryptionInfo feInfo,
byte storagePolicy, ErasureCodingPolicy ecPolicy) {
super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, flags, owner, group, symlink, path, fileId,
childrenNum, feInfo, storagePolicy, ecPolicy);
this.locations = locations;
}
public LocatedBlocks getBlockLocations() {
return locations;
}
/**
* This function is used to transform the underlying HDFS LocatedBlocks to
* BlockLocations.
*
* The returned BlockLocation will have different formats for replicated
* and erasure coded file.
* Please refer to
* {@link org.apache.hadoop.fs.FileSystem#getFileBlockLocations
* (FileStatus, long, long)}
* for examples.
*/
public final LocatedFileStatus makeQualifiedLocated(URI defaultUri,
Path path) {
makeQualified(defaultUri, path);
return new LocatedFileStatus(this,
DFSUtilClient.locatedBlocks2Locations(getBlockLocations()));
}
@Override
public boolean equals(Object o) {
// satisfy findbugs
return super.equals(o);
}
@Override
public int hashCode() {
// satisfy findbugs
return super.hashCode();
}
}

View File

@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsPathHandle;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -1584,36 +1585,23 @@ public class PBHelperClient {
EnumSet<HdfsFileStatus.Flags> flags = fs.hasFlags()
? convertFlags(fs.getFlags())
: convertFlags(fs.getPermission());
return new HdfsFileStatus.Builder()
.length(fs.getLength())
.isdir(fs.getFileType().equals(FileType.IS_DIR))
.replication(fs.getBlockReplication())
.blocksize(fs.getBlocksize())
.mtime(fs.getModificationTime())
.atime(fs.getAccessTime())
.perm(convert(fs.getPermission()))
.flags(flags)
.owner(fs.getOwner())
.group(fs.getGroup())
.symlink(FileType.IS_SYMLINK.equals(fs.getFileType())
? fs.getSymlink().toByteArray()
: null)
.path(fs.getPath().toByteArray())
.fileId(fs.hasFileId()
? fs.getFileId()
: HdfsConstants.GRANDFATHER_INODE_ID)
.locations(fs.hasLocations() ? convert(fs.getLocations()) : null)
.children(fs.hasChildrenNum() ? fs.getChildrenNum() : -1)
.feInfo(fs.hasFileEncryptionInfo()
? convert(fs.getFileEncryptionInfo())
: null)
.storagePolicy(fs.hasStoragePolicy()
? (byte) fs.getStoragePolicy()
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED)
.ecPolicy(fs.hasEcPolicy()
? convertErasureCodingPolicy(fs.getEcPolicy())
: null)
.build();
return new HdfsLocatedFileStatus(
fs.getLength(), fs.getFileType().equals(FileType.IS_DIR),
fs.getBlockReplication(), fs.getBlocksize(),
fs.getModificationTime(), fs.getAccessTime(),
convert(fs.getPermission()),
flags,
fs.getOwner(), fs.getGroup(),
fs.getFileType().equals(FileType.IS_SYMLINK) ?
fs.getSymlink().toByteArray() : null,
fs.getPath().toByteArray(),
fs.hasFileId()? fs.getFileId(): HdfsConstants.GRANDFATHER_INODE_ID,
fs.hasLocations() ? convert(fs.getLocations()) : null,
fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
: HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
fs.hasEcPolicy() ? convertErasureCodingPolicy(fs.getEcPolicy()) : null);
}
private static EnumSet<HdfsFileStatus.Flags> convertFlags(int flags) {
@ -1876,9 +1864,9 @@ public class PBHelperClient {
if (dl == null)
return null;
List<HdfsFileStatusProto> partList = dl.getPartialListingList();
return new DirectoryListing(partList.isEmpty()
? new HdfsFileStatus[0]
: convert(partList.toArray(new HdfsFileStatusProto[partList.size()])),
return new DirectoryListing(partList.isEmpty() ?
new HdfsLocatedFileStatus[0] :
convert(partList.toArray(new HdfsFileStatusProto[partList.size()])),
dl.getRemainingEntries());
}
@ -2173,10 +2161,13 @@ public class PBHelperClient {
if (fs.getFileEncryptionInfo() != null) {
builder.setFileEncryptionInfo(convert(fs.getFileEncryptionInfo()));
}
LocatedBlocks locations = fs.getLocatedBlocks();
if (fs instanceof HdfsLocatedFileStatus) {
final HdfsLocatedFileStatus lfs = (HdfsLocatedFileStatus) fs;
LocatedBlocks locations = lfs.getBlockLocations();
if (locations != null) {
builder.setLocations(convert(locations));
}
}
if(fs.getErasureCodingPolicy() != null) {
builder.setEcPolicy(convertErasureCodingPolicy(
fs.getErasureCodingPolicy()));

View File

@ -260,10 +260,11 @@
<Method name="visitFile" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
</Match>
<!-- BlockLocations are user-facing, but LocatedBlocks are not. -->
<!-- HdfsFileStatus is user-facing, but HdfsLocatedFileStatus is not.
Defensible compatibility choices over time create odd corners. -->
<Match>
<Class name="org.apache.hadoop.hdfs.protocol.HdfsFileStatus" />
<Field name="hdfsloc" />
<Class name="org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus" />
<Field name="locations" />
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" />
</Match>
<Match>

View File

@ -364,7 +364,7 @@ public class Mover {
if (!isSnapshotPathInCurrent(fullPath)) {
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
processFile(fullPath, status, result);
processFile(fullPath, (HdfsLocatedFileStatus) status, result);
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
@ -374,7 +374,7 @@ public class Mover {
}
/** @return true if it is necessary to run another round of migration */
private void processFile(String fullPath, HdfsFileStatus status,
private void processFile(String fullPath, HdfsLocatedFileStatus status,
Result result) {
byte policyId = status.getStoragePolicy();
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
@ -395,7 +395,7 @@ public class Mover {
status.getReplication());
final ErasureCodingPolicy ecPolicy = status.getErasureCodingPolicy();
final LocatedBlocks locatedBlocks = status.getLocatedBlocks();
final LocatedBlocks locatedBlocks = status.getBlockLocations();
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
for (int i = 0; i < lbs.size(); i++) {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -255,12 +256,13 @@ class FSDirStatAndListingOp {
listing[i] =
createFileStatus(fsd, iip, child, childStoragePolicy, needLocation);
listingCnt++;
LocatedBlocks blks = listing[i].getLocatedBlocks();
if (blks != null) {
if (listing[i] instanceof HdfsLocatedFileStatus) {
// Once we hit lsLimit locations, stop.
// This helps to prevent excessively large response payloads.
// Approximate #locations with locatedBlockCount() * repl_factor
locationBudget -=
LocatedBlocks blks =
((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
locationBudget -= (blks == null) ? 0 :
blks.locatedBlockCount() * listing[i].getReplication();
}
}
@ -484,6 +486,7 @@ class FSDirStatAndListingOp {
String owner, String group, byte[] symlink, byte[] path, long fileId,
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy,
ErasureCodingPolicy ecPolicy, LocatedBlocks locations) {
if (locations == null) {
return new HdfsFileStatus.Builder()
.length(length)
.isdir(isdir)
@ -502,8 +505,12 @@ class FSDirStatAndListingOp {
.feInfo(feInfo)
.storagePolicy(storagePolicy)
.ecPolicy(ecPolicy)
.locations(locations)
.build();
} else {
return new HdfsLocatedFileStatus(length, isdir, replication, blocksize,
mtime, atime, permission, flags, owner, group, symlink, path,
fileId, locations, childrenNum, feInfo, storagePolicy, ecPolicy);
}
}
private static ContentSummary getContentSummaryInt(FSDirectory fsd,

View File

@ -1092,11 +1092,11 @@ public class TestBlockStoragePolicy {
return types;
}
private void checkLocatedBlocks(HdfsFileStatus status, int blockNum,
private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
int replicaNum, StorageType... types) {
List<StorageType> typeList = Lists.newArrayList();
Collections.addAll(typeList, types);
LocatedBlocks lbs = status.getLocatedBlocks();
LocatedBlocks lbs = status.getBlockLocations();
Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size());
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
Assert.assertEquals(replicaNum, lb.getStorageTypes().length);
@ -1127,7 +1127,7 @@ public class TestBlockStoragePolicy {
HdfsFileStatus[] status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
HdfsFileStatus fooStatus = status[0];
HdfsLocatedFileStatus fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, 3, before);
// change the replication factor to 5
@ -1140,7 +1140,7 @@ public class TestBlockStoragePolicy {
status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
fooStatus = status[0];
fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, numDataNodes, after);
// change the replication factor back to 3
@ -1157,7 +1157,7 @@ public class TestBlockStoragePolicy {
status = fs.getClient().listPaths(foo.toString(),
HdfsFileStatus.EMPTY_NAME, true).getPartialListing();
checkDirectoryListing(status, policyId);
fooStatus = status[0];
fooStatus = (HdfsLocatedFileStatus) status[0];
checkLocatedBlocks(fooStatus, 1, REPLICATION, before);
} finally {
cluster.shutdown();

View File

@ -274,7 +274,7 @@ public class TestDFSOutputStream {
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
DataStreamer stream = new DataStreamer(
new HdfsFileStatus.Builder().build(),
mock(HdfsFileStatus.class),
mock(ExtendedBlock.class),
client,
"foo", null, null, null, null, null, null);

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
@ -311,17 +312,18 @@ public class TestStorageMover {
private void verifyFile(final Path parent, final HdfsFileStatus status,
final Byte expectedPolicyId) throws Exception {
byte policyId = status.getStoragePolicy();
HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
byte policyId = fileStatus.getStoragePolicy();
BlockStoragePolicy policy = policies.getPolicy(policyId);
if (expectedPolicyId != null) {
Assert.assertEquals((byte)expectedPolicyId, policy.getId());
}
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
for(LocatedBlock lb : status.getLocatedBlocks().getLocatedBlocks()) {
for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
lb.getStorageTypes());
Assert.assertTrue(status.getFullName(parent.toString())
Assert.assertTrue(fileStatus.getFullName(parent.toString())
+ " with policy " + policy + " has non-empty overlap: " + diff
+ ", the corresponding block is " + lb.getBlock().getLocalBlock(),
diff.removeOverlap(true));