HDFS-8887. Expose storage type and storage ID in BlockLocation.
(cherry picked from commit 1ea1a8334e
)
This commit is contained in:
parent
2ebdf5bfce
commit
d53296281a
|
@ -34,11 +34,15 @@ public class BlockLocation {
|
|||
private String[] cachedHosts; // Datanode hostnames with a cached replica
|
||||
private String[] names; // Datanode IP:xferPort for accessing the block
|
||||
private String[] topologyPaths; // Full path name in network topology
|
||||
private String[] storageIds; // Storage ID of each replica
|
||||
private StorageType[] storageTypes; // Storage type of each replica
|
||||
private long offset; // Offset of the block in the file
|
||||
private long length;
|
||||
private boolean corrupt;
|
||||
|
||||
private static final String[] EMPTY_STR_ARRAY = new String[0];
|
||||
private static final StorageType[] EMPTY_STORAGE_TYPE_ARRAY =
|
||||
new StorageType[0];
|
||||
|
||||
/**
|
||||
* Default Constructor
|
||||
|
@ -58,6 +62,8 @@ public class BlockLocation {
|
|||
this.offset = that.offset;
|
||||
this.length = that.length;
|
||||
this.corrupt = that.corrupt;
|
||||
this.storageIds = that.storageIds;
|
||||
this.storageTypes = that.storageTypes;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,6 +101,13 @@ public class BlockLocation {
|
|||
|
||||
public BlockLocation(String[] names, String[] hosts, String[] cachedHosts,
|
||||
String[] topologyPaths, long offset, long length, boolean corrupt) {
|
||||
this(names, hosts, cachedHosts, topologyPaths, null, null, offset, length,
|
||||
corrupt);
|
||||
}
|
||||
|
||||
public BlockLocation(String[] names, String[] hosts, String[] cachedHosts,
|
||||
String[] topologyPaths, String[] storageIds, StorageType[] storageTypes,
|
||||
long offset, long length, boolean corrupt) {
|
||||
if (names == null) {
|
||||
this.names = EMPTY_STR_ARRAY;
|
||||
} else {
|
||||
|
@ -115,6 +128,16 @@ public class BlockLocation {
|
|||
} else {
|
||||
this.topologyPaths = topologyPaths;
|
||||
}
|
||||
if (storageIds == null) {
|
||||
this.storageIds = EMPTY_STR_ARRAY;
|
||||
} else {
|
||||
this.storageIds = storageIds;
|
||||
}
|
||||
if (storageTypes == null) {
|
||||
this.storageTypes = EMPTY_STORAGE_TYPE_ARRAY;
|
||||
} else {
|
||||
this.storageTypes = storageTypes;
|
||||
}
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
this.corrupt = corrupt;
|
||||
|
@ -149,6 +172,20 @@ public class BlockLocation {
|
|||
return topologyPaths;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the storageID of each replica of the block.
|
||||
*/
|
||||
public String[] getStorageIds() {
|
||||
return storageIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the storage type of each replica of the block.
|
||||
*/
|
||||
public StorageType[] getStorageTypes() {
|
||||
return storageTypes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the start offset of file associated with this block
|
||||
*/
|
||||
|
@ -235,6 +272,22 @@ public class BlockLocation {
|
|||
}
|
||||
}
|
||||
|
||||
public void setStorageIds(String[] storageIds) {
|
||||
if (storageIds == null) {
|
||||
this.storageIds = EMPTY_STR_ARRAY;
|
||||
} else {
|
||||
this.storageIds = storageIds;
|
||||
}
|
||||
}
|
||||
|
||||
public void setStorageTypes(StorageType[] storageTypes) {
|
||||
if (storageTypes == null) {
|
||||
this.storageTypes = EMPTY_STORAGE_TYPE_ARRAY;
|
||||
} else {
|
||||
this.storageTypes = storageTypes;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder result = new StringBuilder();
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.junit.Test;
|
|||
public class TestBlockLocation {
|
||||
|
||||
private static final String[] EMPTY_STR_ARRAY = new String[0];
|
||||
private static final StorageType[] EMPTY_STORAGE_TYPE_ARRAY =
|
||||
new StorageType[0];
|
||||
|
||||
private static void checkBlockLocation(final BlockLocation loc)
|
||||
throws Exception {
|
||||
|
@ -36,22 +38,29 @@ public class TestBlockLocation {
|
|||
final long offset, final long length, final boolean corrupt)
|
||||
throws Exception {
|
||||
checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY,
|
||||
EMPTY_STR_ARRAY, offset, length, corrupt);
|
||||
EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STORAGE_TYPE_ARRAY, offset,
|
||||
length, corrupt);
|
||||
}
|
||||
|
||||
private static void checkBlockLocation(final BlockLocation loc,
|
||||
String[] names, String[] hosts, String[] cachedHosts,
|
||||
String[] topologyPaths, final long offset, final long length,
|
||||
String[] topologyPaths,
|
||||
String[] storageIds, StorageType[] storageTypes,
|
||||
final long offset, final long length,
|
||||
final boolean corrupt) throws Exception {
|
||||
assertNotNull(loc.getHosts());
|
||||
assertNotNull(loc.getCachedHosts());
|
||||
assertNotNull(loc.getNames());
|
||||
assertNotNull(loc.getTopologyPaths());
|
||||
assertNotNull(loc.getStorageIds());
|
||||
assertNotNull(loc.getStorageTypes());
|
||||
|
||||
assertArrayEquals(hosts, loc.getHosts());
|
||||
assertArrayEquals(cachedHosts, loc.getCachedHosts());
|
||||
assertArrayEquals(names, loc.getNames());
|
||||
assertArrayEquals(topologyPaths, loc.getTopologyPaths());
|
||||
assertArrayEquals(storageIds, loc.getStorageIds());
|
||||
assertArrayEquals(storageTypes, loc.getStorageTypes());
|
||||
|
||||
assertEquals(offset, loc.getOffset());
|
||||
assertEquals(length, loc.getLength());
|
||||
|
@ -75,6 +84,8 @@ public class TestBlockLocation {
|
|||
checkBlockLocation(loc, 1, 2, true);
|
||||
loc = new BlockLocation(null, null, null, null, 1, 2, true);
|
||||
checkBlockLocation(loc, 1, 2, true);
|
||||
loc = new BlockLocation(null, null, null, null, null, null, 1, 2, true);
|
||||
checkBlockLocation(loc, 1, 2, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,14 +106,18 @@ public class TestBlockLocation {
|
|||
String[] hosts = new String[] { "host" };
|
||||
String[] cachedHosts = new String[] { "cachedHost" };
|
||||
String[] topologyPaths = new String[] { "path" };
|
||||
String[] storageIds = new String[] { "storageId" };
|
||||
StorageType[] storageTypes = new StorageType[] { StorageType.DISK };
|
||||
loc.setNames(names);
|
||||
loc.setHosts(hosts);
|
||||
loc.setCachedHosts(cachedHosts);
|
||||
loc.setTopologyPaths(topologyPaths);
|
||||
loc.setStorageIds(storageIds);
|
||||
loc.setStorageTypes(storageTypes);
|
||||
loc.setOffset(1);
|
||||
loc.setLength(2);
|
||||
loc.setCorrupt(true);
|
||||
checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2,
|
||||
true);
|
||||
checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths,
|
||||
storageIds, storageTypes, 1, 2, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,6 +181,8 @@ public class DFSUtilClient {
|
|||
}
|
||||
blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
|
||||
racks,
|
||||
blk.getStorageIDs(),
|
||||
blk.getStorageTypes(),
|
||||
blk.getStartOffset(),
|
||||
blk.getBlockSize(),
|
||||
blk.isCorrupt());
|
||||
|
|
|
@ -435,6 +435,8 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8805. Archival Storage: getStoragePolicy should not need superuser privilege.
|
||||
(Brahma Reddy Battula via jing9)
|
||||
|
||||
HDFS-8887. Expose storage type and storage ID in BlockLocation. (wang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
*/
|
||||
@InterfaceStability.Unstable
|
||||
@InterfaceAudience.Public
|
||||
@Deprecated
|
||||
public class BlockStorageLocation extends BlockLocation {
|
||||
|
||||
private final VolumeId[] volumeIds;
|
||||
|
|
|
@ -233,6 +233,11 @@ public class DistributedFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* This API has been deprecated since the NameNode now tracks datanode
|
||||
* storages separately. Storage IDs can be gotten from {@link
|
||||
* BlockLocation#getStorageIds()}, which are functionally equivalent to
|
||||
* the volume IDs returned here (although a String rather than a byte[]).
|
||||
*
|
||||
* Used to query storage location information for a list of blocks. This list
|
||||
* of blocks is normally constructed via a series of calls to
|
||||
* {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
|
||||
|
@ -256,6 +261,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
* information for each replica of each block.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
@Deprecated
|
||||
public BlockStorageLocation[] getFileBlockStorageLocations(
|
||||
List<BlockLocation> blocks) throws IOException,
|
||||
UnsupportedOperationException, InvalidBlockTokenException {
|
||||
|
|
|
@ -40,8 +40,10 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
|
@ -62,13 +64,17 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
|||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.VolumeId;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
|
@ -702,6 +708,61 @@ public class TestDistributedFileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=120000)
|
||||
public void testLocatedFileStatusStorageIdsTypes() throws Exception {
|
||||
final Configuration conf = getTestConfiguration();
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(3).build();
|
||||
try {
|
||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||
final Path testFile = new Path("/testListLocatedStatus");
|
||||
final int blockSize = 4096;
|
||||
final int numBlocks = 10;
|
||||
// Create a test file
|
||||
final int repl = 2;
|
||||
DFSTestUtil.createFile(fs, testFile, blockSize, numBlocks * blockSize,
|
||||
blockSize, (short) repl, 0xADDED);
|
||||
// Get the listing
|
||||
RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(testFile);
|
||||
assertTrue("Expected file to be present", it.hasNext());
|
||||
LocatedFileStatus stat = it.next();
|
||||
BlockLocation[] locs = stat.getBlockLocations();
|
||||
assertEquals("Unexpected number of locations", numBlocks, locs.length);
|
||||
|
||||
Set<String> dnStorageIds = new HashSet<>();
|
||||
for (DataNode d : cluster.getDataNodes()) {
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes = d.getFSDataset()
|
||||
.getFsVolumeReferences()) {
|
||||
for (FsVolumeSpi vol : volumes) {
|
||||
dnStorageIds.add(vol.getStorageID());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (BlockLocation loc : locs) {
|
||||
String[] ids = loc.getStorageIds();
|
||||
// Run it through a set to deduplicate, since there should be no dupes
|
||||
Set<String> storageIds = new HashSet<>();
|
||||
for (String id: ids) {
|
||||
storageIds.add(id);
|
||||
}
|
||||
assertEquals("Unexpected num storage ids", repl, storageIds.size());
|
||||
// Make sure these are all valid storage IDs
|
||||
assertTrue("Unknown storage IDs found!", dnStorageIds.containsAll
|
||||
(storageIds));
|
||||
// Check storage types are the default, since we didn't set any
|
||||
StorageType[] types = loc.getStorageTypes();
|
||||
assertEquals("Unexpected num storage types", repl, types.length);
|
||||
for (StorageType t: types) {
|
||||
assertEquals("Unexpected storage type", StorageType.DEFAULT, t);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the normal path of batching up BlockLocation[]s to be passed to a
|
||||
|
|
Loading…
Reference in New Issue