HDFS-8887. Expose storage type and storage ID in BlockLocation.

This commit is contained in:
Andrew Wang 2015-08-11 23:25:33 -07:00
parent 3ae716fa69
commit 1ea1a8334e
7 changed files with 145 additions and 6 deletions

View File

@ -34,11 +34,15 @@ public class BlockLocation {
private String[] cachedHosts; // Datanode hostnames with a cached replica
private String[] names; // Datanode IP:xferPort for accessing the block
private String[] topologyPaths; // Full path name in network topology
private String[] storageIds; // Storage ID of each replica
private StorageType[] storageTypes; // Storage type of each replica
private long offset; // Offset of the block in the file
private long length;
private boolean corrupt;
private static final String[] EMPTY_STR_ARRAY = new String[0];
private static final StorageType[] EMPTY_STORAGE_TYPE_ARRAY =
new StorageType[0];
/**
* Default Constructor
@ -58,6 +62,8 @@ public class BlockLocation {
this.offset = that.offset;
this.length = that.length;
this.corrupt = that.corrupt;
this.storageIds = that.storageIds;
this.storageTypes = that.storageTypes;
}
/**
@ -95,6 +101,13 @@ public class BlockLocation {
public BlockLocation(String[] names, String[] hosts, String[] cachedHosts,
String[] topologyPaths, long offset, long length, boolean corrupt) {
this(names, hosts, cachedHosts, topologyPaths, null, null, offset, length,
corrupt);
}
public BlockLocation(String[] names, String[] hosts, String[] cachedHosts,
String[] topologyPaths, String[] storageIds, StorageType[] storageTypes,
long offset, long length, boolean corrupt) {
if (names == null) {
this.names = EMPTY_STR_ARRAY;
} else {
@ -115,6 +128,16 @@ public class BlockLocation {
} else {
this.topologyPaths = topologyPaths;
}
if (storageIds == null) {
this.storageIds = EMPTY_STR_ARRAY;
} else {
this.storageIds = storageIds;
}
if (storageTypes == null) {
this.storageTypes = EMPTY_STORAGE_TYPE_ARRAY;
} else {
this.storageTypes = storageTypes;
}
this.offset = offset;
this.length = length;
this.corrupt = corrupt;
@ -149,6 +172,20 @@ public class BlockLocation {
return topologyPaths;
}
/**
* Get the storageID of each replica of the block.
*/
public String[] getStorageIds() {
return storageIds;
}
/**
* Get the storage type of each replica of the block.
*/
public StorageType[] getStorageTypes() {
return storageTypes;
}
/**
* Get the start offset of file associated with this block
*/
@ -235,6 +272,22 @@ public class BlockLocation {
}
}
public void setStorageIds(String[] storageIds) {
if (storageIds == null) {
this.storageIds = EMPTY_STR_ARRAY;
} else {
this.storageIds = storageIds;
}
}
public void setStorageTypes(StorageType[] storageTypes) {
if (storageTypes == null) {
this.storageTypes = EMPTY_STORAGE_TYPE_ARRAY;
} else {
this.storageTypes = storageTypes;
}
}
@Override
public String toString() {
StringBuilder result = new StringBuilder();

View File

@ -26,6 +26,8 @@ import org.junit.Test;
public class TestBlockLocation {
private static final String[] EMPTY_STR_ARRAY = new String[0];
private static final StorageType[] EMPTY_STORAGE_TYPE_ARRAY =
new StorageType[0];
private static void checkBlockLocation(final BlockLocation loc)
throws Exception {
@ -36,22 +38,29 @@ public class TestBlockLocation {
final long offset, final long length, final boolean corrupt)
throws Exception {
checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY,
EMPTY_STR_ARRAY, offset, length, corrupt);
EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STORAGE_TYPE_ARRAY, offset,
length, corrupt);
}
private static void checkBlockLocation(final BlockLocation loc,
String[] names, String[] hosts, String[] cachedHosts,
String[] topologyPaths, final long offset, final long length,
String[] topologyPaths,
String[] storageIds, StorageType[] storageTypes,
final long offset, final long length,
final boolean corrupt) throws Exception {
assertNotNull(loc.getHosts());
assertNotNull(loc.getCachedHosts());
assertNotNull(loc.getNames());
assertNotNull(loc.getTopologyPaths());
assertNotNull(loc.getStorageIds());
assertNotNull(loc.getStorageTypes());
assertArrayEquals(hosts, loc.getHosts());
assertArrayEquals(cachedHosts, loc.getCachedHosts());
assertArrayEquals(names, loc.getNames());
assertArrayEquals(topologyPaths, loc.getTopologyPaths());
assertArrayEquals(storageIds, loc.getStorageIds());
assertArrayEquals(storageTypes, loc.getStorageTypes());
assertEquals(offset, loc.getOffset());
assertEquals(length, loc.getLength());
@ -75,6 +84,8 @@ public class TestBlockLocation {
checkBlockLocation(loc, 1, 2, true);
loc = new BlockLocation(null, null, null, null, 1, 2, true);
checkBlockLocation(loc, 1, 2, true);
loc = new BlockLocation(null, null, null, null, null, null, 1, 2, true);
checkBlockLocation(loc, 1, 2, true);
}
/**
@ -95,14 +106,18 @@ public class TestBlockLocation {
String[] hosts = new String[] { "host" };
String[] cachedHosts = new String[] { "cachedHost" };
String[] topologyPaths = new String[] { "path" };
String[] storageIds = new String[] { "storageId" };
StorageType[] storageTypes = new StorageType[] { StorageType.DISK };
loc.setNames(names);
loc.setHosts(hosts);
loc.setCachedHosts(cachedHosts);
loc.setTopologyPaths(topologyPaths);
loc.setStorageIds(storageIds);
loc.setStorageTypes(storageTypes);
loc.setOffset(1);
loc.setLength(2);
loc.setCorrupt(true);
checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2,
true);
checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths,
storageIds, storageTypes, 1, 2, true);
}
}

View File

@ -181,6 +181,8 @@ public class DFSUtilClient {
}
blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
racks,
blk.getStorageIDs(),
blk.getStorageTypes(),
blk.getStartOffset(),
blk.getBlockSize(),
blk.isCorrupt());

View File

@ -778,6 +778,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8805. Archival Storage: getStoragePolicy should not need superuser privilege.
(Brahma Reddy Battula via jing9)
HDFS-8887. Expose storage type and storage ID in BlockLocation. (wang)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceStability.Unstable
@InterfaceAudience.Public
@Deprecated
public class BlockStorageLocation extends BlockLocation {
private final VolumeId[] volumeIds;

View File

@ -234,6 +234,11 @@ public class DistributedFileSystem extends FileSystem {
}
/**
* This API has been deprecated since the NameNode now tracks datanode
* storages separately. Storage IDs can be gotten from {@link
* BlockLocation#getStorageIds()}, which are functionally equivalent to
* the volume IDs returned here (although a String rather than a byte[]).
*
* Used to query storage location information for a list of blocks. This list
* of blocks is normally constructed via a series of calls to
* {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
@ -257,6 +262,7 @@ public class DistributedFileSystem extends FileSystem {
* information for each replica of each block.
*/
@InterfaceStability.Unstable
@Deprecated
public BlockStorageLocation[] getFileBlockStorageLocations(
List<BlockLocation> blocks) throws IOException,
UnsupportedOperationException, InvalidBlockTokenException {

View File

@ -40,8 +40,10 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.impl.Log4JLogger;
@ -71,7 +73,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -825,6 +830,61 @@ public class TestDistributedFileSystem {
}
}
@Test(timeout=120000)
public void testLocatedFileStatusStorageIdsTypes() throws Exception {
final Configuration conf = getTestConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build();
try {
final DistributedFileSystem fs = cluster.getFileSystem();
final Path testFile = new Path("/testListLocatedStatus");
final int blockSize = 4096;
final int numBlocks = 10;
// Create a test file
final int repl = 2;
DFSTestUtil.createFile(fs, testFile, blockSize, numBlocks * blockSize,
blockSize, (short) repl, 0xADDED);
// Get the listing
RemoteIterator<LocatedFileStatus> it = fs.listLocatedStatus(testFile);
assertTrue("Expected file to be present", it.hasNext());
LocatedFileStatus stat = it.next();
BlockLocation[] locs = stat.getBlockLocations();
assertEquals("Unexpected number of locations", numBlocks, locs.length);
Set<String> dnStorageIds = new HashSet<>();
for (DataNode d : cluster.getDataNodes()) {
try (FsDatasetSpi.FsVolumeReferences volumes = d.getFSDataset()
.getFsVolumeReferences()) {
for (FsVolumeSpi vol : volumes) {
dnStorageIds.add(vol.getStorageID());
}
}
}
for (BlockLocation loc : locs) {
String[] ids = loc.getStorageIds();
// Run it through a set to deduplicate, since there should be no dupes
Set<String> storageIds = new HashSet<>();
for (String id: ids) {
storageIds.add(id);
}
assertEquals("Unexpected num storage ids", repl, storageIds.size());
// Make sure these are all valid storage IDs
assertTrue("Unknown storage IDs found!", dnStorageIds.containsAll
(storageIds));
// Check storage types are the default, since we didn't set any
StorageType[] types = loc.getStorageTypes();
assertEquals("Unexpected num storage types", repl, types.length);
for (StorageType t: types) {
assertEquals("Unexpected storage type", StorageType.DEFAULT, t);
}
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Tests the normal path of batching up BlockLocation[]s to be passed to a