From 39922a7b4cb4f39bd2dfdf8bf7b46dea3e709412 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 26 Feb 2014 21:57:10 +0000 Subject: [PATCH] HDFS-3969. Small bug fixes and improvements for disk locations API. Contributed by Todd Lipcon and Andrew Wang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1572287 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/fs/HdfsVolumeId.java | 26 +--- .../java/org/apache/hadoop/fs/VolumeId.java | 51 ------- .../hadoop/hdfs/BlockStorageLocationUtil.java | 125 ++++++++------- .../org/apache/hadoop/hdfs/DFSClient.java | 20 ++- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +- .../hadoop/hdfs/DistributedFileSystem.java | 6 +- .../apache/hadoop/hdfs/HdfsConfiguration.java | 4 +- .../hdfs/protocol/ClientDatanodeProtocol.java | 7 +- .../hdfs/protocol/HdfsBlocksMetadata.java | 29 +++- ...atanodeProtocolServerSideTranslatorPB.java | 13 +- .../ClientDatanodeProtocolTranslatorPB.java | 16 +- .../hadoop/hdfs/server/datanode/DataNode.java | 13 +- .../datanode/fsdataset/FsDatasetSpi.java | 7 +- .../fsdataset/impl/FsDatasetImpl.java | 30 ++-- .../main/proto/ClientDatanodeProtocol.proto | 14 +- .../src/main/resources/hdfs-default.xml | 6 +- .../org/apache/hadoop/fs/TestVolumeId.java | 39 +---- .../apache/hadoop/hdfs/MiniDFSCluster.java | 13 +- .../hdfs/MiniDFSClusterWithNodeGroup.java | 2 +- .../hdfs/TestDistributedFileSystem.java | 144 +++++++++++++++--- .../server/datanode/SimulatedFSDataset.java | 2 +- .../hdfs/server/namenode/ha/HATestUtil.java | 2 +- 23 files changed, 313 insertions(+), 263 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 04cd158b002..83643ed616d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -109,6 +109,9 @@ Release 2.4.0 - UNRELEASED HDFS-6018. Exception recorded in LOG when IPCLoggerChannel#close is called. (jing9) + HDFS-3969. Small bug fixes and improvements for disk locations API. + (Todd Lipcon and Andrew Wang) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java index aa6785037c1..6e9d3d77cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java @@ -17,18 +17,18 @@ */ package org.apache.hadoop.fs; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; /** * HDFS-specific volume identifier which implements {@link VolumeId}. Can be * used to differentiate between the data directories on a single datanode. This * identifier is only unique on a per-datanode basis. - * - * Note that invalid IDs are represented by {@link VolumeId#INVALID_VOLUME_ID}. */ @InterfaceStability.Unstable @InterfaceAudience.Public @@ -37,28 +37,15 @@ public class HdfsVolumeId implements VolumeId { private final byte[] id; public HdfsVolumeId(byte[] id) { - if (id == null) { - throw new NullPointerException("A valid Id can only be constructed " + - "with a non-null byte array."); - } + Preconditions.checkNotNull(id, "id cannot be null"); this.id = id; } - @Override - public final boolean isValid() { - return true; - } - @Override public int compareTo(VolumeId arg0) { if (arg0 == null) { return 1; } - if (!arg0.isValid()) { - // any valid ID is greater - // than any invalid ID: - return 1; - } return hashCode() - arg0.hashCode(); } @@ -76,14 +63,11 @@ public class HdfsVolumeId implements VolumeId { return true; } HdfsVolumeId that = (HdfsVolumeId) obj; - // NB: if (!obj.isValid()) { return false; } check is not necessary - // because we have class identity checking above, and for this class - // isValid() is always true. return new EqualsBuilder().append(this.id, that.id).isEquals(); } @Override public String toString() { - return Base64.encodeBase64String(id); + return StringUtils.byteToHexString(id); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java index b756241e976..e56e30409eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java @@ -28,57 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public public interface VolumeId extends Comparable { - /** - * Represents an invalid Volume ID (ID for unknown content). - */ - public static final VolumeId INVALID_VOLUME_ID = new VolumeId() { - - @Override - public int compareTo(VolumeId arg0) { - // This object is equal only to itself; - // It is greater than null, and - // is always less than any other VolumeId: - if (arg0 == null) { - return 1; - } - if (arg0 == this) { - return 0; - } else { - return -1; - } - } - - @Override - public boolean equals(Object obj) { - // this object is equal only to itself: - return (obj == this); - } - - @Override - public int hashCode() { - return Integer.MIN_VALUE; - } - - @Override - public boolean isValid() { - return false; - } - - @Override - public String toString() { - return "Invalid VolumeId"; - } - }; - - /** - * Indicates if the disk identifier is valid. Invalid identifiers indicate - * that the block was not present, or the location could otherwise not be - * determined. - * - * @return true if the disk identifier is valid - */ - public boolean isValid(); - @Override abstract public int compareTo(VolumeId arg0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java index 934f8dfe516..a2a53d30286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -43,7 +41,6 @@ import org.apache.hadoop.fs.HdfsVolumeId; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -51,16 +48,20 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.token.Token; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + @InterfaceAudience.Private @InterfaceStability.Unstable class BlockStorageLocationUtil { - private static final Log LOG = LogFactory + static final Log LOG = LogFactory .getLog(BlockStorageLocationUtil.class); /** * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set - * of datanodes and blocks. + * of datanodes and blocks. The blocks must all correspond to the same + * block pool. * * @param datanodeBlocks * Map of datanodes to block replicas at each datanode @@ -70,6 +71,11 @@ class BlockStorageLocationUtil { private static List createVolumeBlockLocationCallables( Configuration conf, Map> datanodeBlocks, int timeout, boolean connectToDnViaHostname) { + + if (datanodeBlocks.isEmpty()) { + return Lists.newArrayList(); + } + // Construct the callables, one per datanode List callables = new ArrayList(); @@ -78,17 +84,32 @@ class BlockStorageLocationUtil { // Construct RPC parameters DatanodeInfo datanode = entry.getKey(); List locatedBlocks = entry.getValue(); - List extendedBlocks = - new ArrayList(locatedBlocks.size()); + if (locatedBlocks.isEmpty()) { + continue; + } + + // Ensure that the blocks all are from the same block pool. + String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId(); + for (LocatedBlock lb : locatedBlocks) { + if (!poolId.equals(lb.getBlock().getBlockPoolId())) { + throw new IllegalArgumentException( + "All blocks to be queried must be in the same block pool: " + + locatedBlocks.get(0).getBlock() + " and " + lb + + " are from different pools."); + } + } + + long[] blockIds = new long[locatedBlocks.size()]; + int i = 0; List> dnTokens = new ArrayList>( locatedBlocks.size()); for (LocatedBlock b : locatedBlocks) { - extendedBlocks.add(b.getBlock()); + blockIds[i++] = b.getBlock().getBlockId(); dnTokens.add(b.getBlockToken()); } VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( - conf, datanode, extendedBlocks, dnTokens, timeout, + conf, datanode, poolId, blockIds, dnTokens, timeout, connectToDnViaHostname); callables.add(callable); } @@ -102,18 +123,17 @@ class BlockStorageLocationUtil { * * @param datanodeBlocks * Map of datanodes to the blocks present on the DN - * @return metadatas List of block metadata for each datanode, specifying - * volume locations for each block + * @return metadatas Map of datanodes to block metadata of the DN * @throws InvalidBlockTokenException * if client does not have read access on a requested block */ - static List queryDatanodesForHdfsBlocksMetadata( + static Map queryDatanodesForHdfsBlocksMetadata( Configuration conf, Map> datanodeBlocks, - int poolsize, int timeout, boolean connectToDnViaHostname) + int poolsize, int timeoutMs, boolean connectToDnViaHostname) throws InvalidBlockTokenException { List callables = - createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout, + createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, connectToDnViaHostname); // Use a thread pool to execute the Callables in parallel @@ -121,27 +141,24 @@ class BlockStorageLocationUtil { new ArrayList>(); ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); try { - futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS); + futures = executor.invokeAll(callables, timeoutMs, + TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // Swallow the exception here, because we can return partial results } executor.shutdown(); - // Initialize metadatas list with nulls - // This is used to later indicate if we didn't get a response from a DN - List metadatas = new ArrayList(); - for (int i = 0; i < futures.size(); i++) { - metadatas.add(null); - } + Map metadatas = + Maps.newHashMapWithExpectedSize(datanodeBlocks.size()); // Fill in metadatas with results from DN RPCs, where possible for (int i = 0; i < futures.size(); i++) { + VolumeBlockLocationCallable callable = callables.get(i); + DatanodeInfo datanode = callable.getDatanodeInfo(); Future future = futures.get(i); try { HdfsBlocksMetadata metadata = future.get(); - metadatas.set(i, metadata); + metadatas.put(callable.getDatanodeInfo(), metadata); } catch (ExecutionException e) { - VolumeBlockLocationCallable callable = callables.get(i); - DatanodeInfo datanode = callable.getDatanodeInfo(); Throwable t = e.getCause(); if (t instanceof InvalidBlockTokenException) { LOG.warn("Invalid access token when trying to retrieve " @@ -153,8 +170,8 @@ class BlockStorageLocationUtil { + " required #getHdfsBlocksMetadata() API"); throw (UnsupportedOperationException) t; } else { - LOG.info("Failed to connect to datanode " + - datanode.getIpcAddr(false)); + LOG.info("Failed to query block locations on datanode " + + datanode.getIpcAddr(false) + ": " + t); } if (LOG.isDebugEnabled()) { LOG.debug("Could not fetch information from datanode", t); @@ -175,23 +192,21 @@ class BlockStorageLocationUtil { * * @param blocks * Original LocatedBlock array - * @param datanodeBlocks - * Mapping from datanodes to the list of replicas on each datanode * @param metadatas * VolumeId information for the replicas on each datanode * @return blockVolumeIds per-replica VolumeId information associated with the * parent LocatedBlock */ static Map> associateVolumeIdsWithBlocks( - List blocks, Map> datanodeBlocks, List metadatas) { + List blocks, + Map metadatas) { // Initialize mapping of ExtendedBlock to LocatedBlock. // Used to associate results from DN RPCs to the parent LocatedBlock - Map extBlockToLocBlock = - new HashMap(); + Map blockIdToLocBlock = + new HashMap(); for (LocatedBlock b : blocks) { - extBlockToLocBlock.put(b.getBlock(), b); + blockIdToLocBlock.put(b.getBlock().getBlockId(), b); } // Initialize the mapping of blocks -> list of VolumeIds, one per replica @@ -200,9 +215,8 @@ class BlockStorageLocationUtil { new HashMap>(); for (LocatedBlock b : blocks) { ArrayList l = new ArrayList(b.getLocations().length); - // Start off all IDs as invalid, fill it in later with results from RPCs for (int i = 0; i < b.getLocations().length; i++) { - l.add(VolumeId.INVALID_VOLUME_ID); + l.add(null); } blockVolumeIds.put(b, l); } @@ -210,27 +224,28 @@ class BlockStorageLocationUtil { // Iterate through the list of metadatas (one per datanode). // For each metadata, if it's valid, insert its volume location information // into the Map returned to the caller - Iterator metadatasIter = metadatas.iterator(); - Iterator datanodeIter = datanodeBlocks.keySet().iterator(); - while (metadatasIter.hasNext()) { - HdfsBlocksMetadata metadata = metadatasIter.next(); - DatanodeInfo datanode = datanodeIter.next(); + for (Map.Entry entry : metadatas.entrySet()) { + DatanodeInfo datanode = entry.getKey(); + HdfsBlocksMetadata metadata = entry.getValue(); // Check if metadata is valid if (metadata == null) { continue; } - ExtendedBlock[] metaBlocks = metadata.getBlocks(); + long[] metaBlockIds = metadata.getBlockIds(); List metaVolumeIds = metadata.getVolumeIds(); List metaVolumeIndexes = metadata.getVolumeIndexes(); // Add VolumeId for each replica in the HdfsBlocksMetadata - for (int j = 0; j < metaBlocks.length; j++) { + for (int j = 0; j < metaBlockIds.length; j++) { int volumeIndex = metaVolumeIndexes.get(j); - ExtendedBlock extBlock = metaBlocks[j]; + long blockId = metaBlockIds[j]; // Skip if block wasn't found, or not a valid index into metaVolumeIds // Also skip if the DN responded with a block we didn't ask for if (volumeIndex == Integer.MAX_VALUE || volumeIndex >= metaVolumeIds.size() - || !extBlockToLocBlock.containsKey(extBlock)) { + || !blockIdToLocBlock.containsKey(blockId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("No data for block " + blockId); + } continue; } // Get the VolumeId by indexing into the list of VolumeIds @@ -238,7 +253,7 @@ class BlockStorageLocationUtil { byte[] volumeId = metaVolumeIds.get(volumeIndex); HdfsVolumeId id = new HdfsVolumeId(volumeId); // Find out which index we are in the LocatedBlock's replicas - LocatedBlock locBlock = extBlockToLocBlock.get(extBlock); + LocatedBlock locBlock = blockIdToLocBlock.get(blockId); DatanodeInfo[] dnInfos = locBlock.getLocations(); int index = -1; for (int k = 0; k < dnInfos.length; k++) { @@ -292,21 +307,23 @@ class BlockStorageLocationUtil { private static class VolumeBlockLocationCallable implements Callable { - private Configuration configuration; - private int timeout; - private DatanodeInfo datanode; - private List extendedBlocks; - private List> dnTokens; - private boolean connectToDnViaHostname; + private final Configuration configuration; + private final int timeout; + private final DatanodeInfo datanode; + private final String poolId; + private final long[] blockIds; + private final List> dnTokens; + private final boolean connectToDnViaHostname; VolumeBlockLocationCallable(Configuration configuration, - DatanodeInfo datanode, List extendedBlocks, + DatanodeInfo datanode, String poolId, long []blockIds, List> dnTokens, int timeout, boolean connectToDnViaHostname) { this.configuration = configuration; this.timeout = timeout; this.datanode = datanode; - this.extendedBlocks = extendedBlocks; + this.poolId = poolId; + this.blockIds = blockIds; this.dnTokens = dnTokens; this.connectToDnViaHostname = connectToDnViaHostname; } @@ -323,7 +340,7 @@ class BlockStorageLocationUtil { try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); - metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens); + metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); } catch (IOException e) { // Bubble this up to the caller, handle with the Future throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index ec9b3f3b40c..985dc57544f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -236,6 +236,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { */ public static class Conf { final int hdfsTimeout; // timeout value for a DFS operation. + final int maxFailoverAttempts; final int maxRetryAttempts; final int failoverSleepBaseMillis; @@ -262,7 +263,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { final boolean connectToDnViaHostname; final boolean getHdfsBlocksMetadataEnabled; final int getFileBlockStorageLocationsNumThreads; - final int getFileBlockStorageLocationsTimeout; + final int getFileBlockStorageLocationsTimeoutMs; final int retryTimesForGetLastBlockLength; final int retryIntervalForGetLastBlockLength; @@ -284,7 +285,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { public Conf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getTimeout(conf); - maxFailoverAttempts = conf.getInt( DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); @@ -343,9 +343,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { getFileBlockStorageLocationsNumThreads = conf.getInt( DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); - getFileBlockStorageLocationsTimeout = conf.getInt( - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT, - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT); + getFileBlockStorageLocationsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); retryTimesForGetLastBlockLength = conf.getInt( DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH, DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); @@ -1215,16 +1215,20 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { } // Make RPCs to the datanodes to get volume locations for its replicas - List metadatas = BlockStorageLocationUtil + Map metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, - getConf().getFileBlockStorageLocationsTimeout, + getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); + if (LOG.isTraceEnabled()) { + LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); + } + // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map> blockVolumeIds = BlockStorageLocationUtil - .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); + .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9a916cba425..40410d56b99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -66,8 +66,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads"; public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10; - public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout"; - public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60; + public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = "dfs.client.file-block-storage-locations.timeout.millis"; + public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000; public static final String DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.times.get-last-block-length"; public static final int DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT = 3; public static final String DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.interval-ms.get-last-block-length"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 2d6b54dbad8..9df725e592b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -230,9 +230,9 @@ public class DistributedFileSystem extends FileSystem { * The returned array of {@link BlockStorageLocation} augments * {@link BlockLocation} with a {@link VolumeId} per block replica. The * VolumeId specifies the volume on the datanode on which the replica resides. - * The VolumeId has to be checked via {@link VolumeId#isValid()} before being - * used because volume information can be unavailable if the corresponding - * datanode is down or if the requested block is not found. + * The VolumeId associated with a replica may be null because volume + * information can be unavailable if the corresponding datanode is down or + * if the requested block is not found. * * This API is unstable, and datanode-side support is disabled by default. It * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java index 345ceaaa79d..8f2966ac47b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java @@ -137,7 +137,9 @@ public class HdfsConfiguration extends Configuration { new DeprecationDelta("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES), new DeprecationDelta("dfs.federation.nameservice.id", - DFSConfigKeys.DFS_NAMESERVICE_ID) + DFSConfigKeys.DFS_NAMESERVICE_ID), + new DeprecationDelta("dfs.client.file-block-storage-locations.timeout", + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS) }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index bad4433c4c7..fd788ab6509 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -115,7 +115,8 @@ public interface ClientDatanodeProtocol { * This is in the form of an opaque {@link VolumeId} for each configured * data directory, which is not guaranteed to be the same across DN restarts. * - * @param blocks + * @param blockPoolId the pool to query + * @param blockIds * list of blocks on the local datanode * @param tokens * block access tokens corresponding to the requested blocks @@ -124,6 +125,6 @@ public interface ClientDatanodeProtocol { * @throws IOException * if datanode is unreachable, or replica is not found on datanode */ - HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, - List> tokens) throws IOException; + HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, + long []blockIds, List> tokens) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java index 5836f3da21c..487bbcbcadd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java @@ -22,6 +22,10 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; + /** * Augments an array of blocks on a datanode with additional information about * where the block is stored. @@ -30,10 +34,13 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public class HdfsBlocksMetadata { + /** The block pool that was queried */ + private final String blockPoolId; + /** * List of blocks */ - private final ExtendedBlock[] blocks; + private final long[] blockIds; /** * List of volumes @@ -50,7 +57,7 @@ public class HdfsBlocksMetadata { /** * Constructs HdfsBlocksMetadata. * - * @param blocks + * @param blockIds * List of blocks described * @param volumeIds * List of potential volume identifiers, specifying volumes where @@ -58,9 +65,13 @@ public class HdfsBlocksMetadata { * @param volumeIndexes * Indexes into the list of volume identifiers, one per block */ - public HdfsBlocksMetadata(ExtendedBlock[] blocks, List volumeIds, + public HdfsBlocksMetadata(String blockPoolId, + long[] blockIds, List volumeIds, List volumeIndexes) { - this.blocks = blocks; + Preconditions.checkArgument(blockIds.length == volumeIndexes.size(), + "Argument lengths should match"); + this.blockPoolId = blockPoolId; + this.blockIds = blockIds; this.volumeIds = volumeIds; this.volumeIndexes = volumeIndexes; } @@ -70,8 +81,8 @@ public class HdfsBlocksMetadata { * * @return array of blocks */ - public ExtendedBlock[] getBlocks() { - return blocks; + public long[] getBlockIds() { + return blockIds; } /** @@ -91,4 +102,10 @@ public class HdfsBlocksMetadata { public List getVolumeIndexes() { return volumeIndexes; } + + @Override + public String toString() { + return "Metadata for " + blockIds.length + " blocks in " + + blockPoolId + ": " + Joiner.on(",").join(Longs.asList(blockIds)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 210f3345972..e9a6bef5d51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -125,19 +126,17 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements throws ServiceException { HdfsBlocksMetadata resp; try { - // Construct the Lists to make the actual call - List blocks = - new ArrayList(request.getBlocksCount()); - for (ExtendedBlockProto b : request.getBlocksList()) { - blocks.add(PBHelper.convert(b)); - } + String poolId = request.getBlockPoolId(); + List> tokens = new ArrayList>(request.getTokensCount()); for (TokenProto b : request.getTokensList()) { tokens.add(PBHelper.convert(b)); } + long[] blockIds = Longs.toArray(request.getBlockIdsList()); + // Call the real implementation - resp = impl.getHdfsBlocksMetadata(blocks, tokens); + resp = impl.getHdfsBlocksMetadata(poolId, blockIds, tokens); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index dedba5af276..9e455579a15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import javax.net.SocketFactory; @@ -57,6 +58,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -217,23 +219,19 @@ public class ClientDatanodeProtocolTranslatorPB implements } @Override - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, + long[] blockIds, List> tokens) throws IOException { - // Convert to proto objects - List blocksProtos = - new ArrayList(blocks.size()); List tokensProtos = new ArrayList(tokens.size()); - for (ExtendedBlock b : blocks) { - blocksProtos.add(PBHelper.convert(b)); - } for (Token t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() - .addAllBlocks(blocksProtos) + .setBlockPoolId(blockPoolId) + .addAllBlockIds(Longs.asList(blockIds)) .addAllTokens(tokensProtos) .build(); // Send the RPC @@ -252,7 +250,7 @@ public class ClientDatanodeProtocolTranslatorPB implements // Array of indexes into the list of volumes, one per block List volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block - return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + return new HdfsBlocksMetadata(blockPoolId, blockIds, volumeIds, volumeIndexes); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index abf47e9dbe2..56a64b2c31a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1176,22 +1176,23 @@ public class DataNode extends Configured } @Override - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + public HdfsBlocksMetadata getHdfsBlocksMetadata( + String bpId, long[] blockIds, List> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } - if (blocks.size() != tokens.size()) { + if (blockIds.length != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block - for (int i = 0; i < blocks.size(); i++) { - checkBlockToken(blocks.get(i), tokens.get(i), - BlockTokenSecretManager.AccessMode.READ); + for (int i = 0; i < blockIds.length; i++) { + checkBlockToken(new ExtendedBlock(bpId, blockIds[i]), + tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } - return data.getHdfsBlocksMetadata(blocks); + return data.getHdfsBlocksMetadata(bpId, blockIds); } private void checkBlockToken(ExtendedBlock block, Token token, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 415c6a985ab..f16c498d3cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -405,11 +405,12 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in * blocks. * - * @param blocks List of blocks for which to return metadata + * @param bpid pool to query + * @param blockIds List of block ids for which to return metadata * @return metadata Metadata for the list of blocks * @throws IOException */ - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) - throws IOException; + public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, + long[] blockIds) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c6c4ba32351..a406d6303b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1855,31 +1855,35 @@ class FsDatasetImpl implements FsDatasetSpi { } @Override // FsDatasetSpi - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) - throws IOException { + public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, + long[] blockIds) throws IOException { // List of VolumeIds, one per volume on the datanode List blocksVolumeIds = new ArrayList(volumes.volumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on - List blocksVolumeIndexes = new ArrayList(blocks.size()); + List blocksVolumeIndexes = new ArrayList(blockIds.length); // Initialize the list of VolumeIds simply by enumerating the volumes for (int i = 0; i < volumes.volumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing // the block's volume against the enumerated volumes - for (int i = 0; i < blocks.size(); i++) { - ExtendedBlock block = blocks.get(i); - FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume(); + for (int i = 0; i < blockIds.length; i++) { + long blockId = blockIds[i]; boolean isValid = false; + + ReplicaInfo info = volumeMap.get(poolId, blockId); int volumeIndex = 0; - for (FsVolumeImpl volume : volumes.volumes) { - // This comparison of references should be safe - if (blockVolume == volume) { - isValid = true; - break; + if (info != null) { + FsVolumeSpi blockVolume = info.getVolume(); + for (FsVolumeImpl volume : volumes.volumes) { + // This comparison of references should be safe + if (blockVolume == volume) { + isValid = true; + break; + } + volumeIndex++; } - volumeIndex++; } // Indicates that the block is not present, or not found in a data dir if (!isValid) { @@ -1887,7 +1891,7 @@ class FsDatasetImpl implements FsDatasetSpi { } blocksVolumeIndexes.add(volumeIndex); } - return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + return new HdfsBlocksMetadata(poolId, blockIds, blocksVolumeIds, blocksVolumeIndexes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto index e62dbbc01d5..8a86a26217a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto @@ -102,12 +102,18 @@ message GetBlockLocalPathInfoResponseProto { } /** - * blocks - list of ExtendedBlocks on which we are querying additional info - * tokens - list of access tokens corresponding to list of ExtendedBlocks + * Query for the disk locations of a number of blocks on this DN. + * blockPoolId - the pool to query + * blockIds - list of block IDs to query + * tokens - list of access tokens corresponding to list of block IDs */ message GetHdfsBlockLocationsRequestProto { - repeated ExtendedBlockProto blocks = 1; + // Removed: HDFS-3969 + // repeated ExtendedBlockProto blocks = 1; repeated hadoop.common.TokenProto tokens = 2; + + required string blockPoolId = 3; + repeated sfixed64 blockIds = 4 [ packed = true ]; } /** @@ -118,7 +124,7 @@ message GetHdfsBlockLocationsRequestProto { */ message GetHdfsBlockLocationsResponseProto { repeated bytes volumeIds = 1; - repeated uint32 volumeIndexes = 2; + repeated uint32 volumeIndexes = 2 [ packed = true ]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 9c3b94fcf80..89b4dc6c954 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1299,10 +1299,10 @@ - dfs.client.file-block-storage-locations.timeout - 60 + dfs.client.file-block-storage-locations.timeout.millis + 1000 - Timeout (in seconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations(). + Timeout (in milliseconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations(). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java index 3584f21b1a4..5bc1a7b1efb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java @@ -123,56 +123,19 @@ public class TestVolumeId { @Test public void testIdEmptyBytes() { final VolumeId idEmpty1 = new HdfsVolumeId(new byte[0]); - assertTrue(idEmpty1.isValid()); final VolumeId idEmpty2 = new HdfsVolumeId(new byte[0]); - assertTrue(idEmpty2.isValid()); final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 }); - assertTrue(idNotEmpty.isValid()); testEq(true, idEmpty1, idEmpty2); testEq(false, idEmpty1, idNotEmpty); testEq(false, idEmpty2, idNotEmpty); } - - /* - * Test the VolumeId.INVALID_VOLUME_ID singleton. - */ - @Test - public void testInvalidId() { - try { - new HdfsVolumeId(null); - assertTrue("NPE expected.", false); - } catch (NullPointerException npe) { - // okay - } - final VolumeId idEmpty = new HdfsVolumeId(new byte[] {}); - final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 }); - - testEq(false, VolumeId.INVALID_VOLUME_ID, idNotEmpty); - testEq(false, VolumeId.INVALID_VOLUME_ID, idEmpty); - - testEqMany(true, - new VolumeId[] { - VolumeId.INVALID_VOLUME_ID, - VolumeId.INVALID_VOLUME_ID, - VolumeId.INVALID_VOLUME_ID } ); - testEqMany(false, - new VolumeId[] { - VolumeId.INVALID_VOLUME_ID, - idEmpty, - idNotEmpty }); - } - + /* * test #toString() for typical VolumeId equality classes */ @Test public void testToString() { - // The #toString() return value is only checked for != null. - // We cannot assert more. - String strInvalid = VolumeId.INVALID_VOLUME_ID.toString(); - assertNotNull(strInvalid); - String strEmpty = new HdfsVolumeId(new byte[] {}).toString(); assertNotNull(strEmpty); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index e0c54c49720..cf281d9718f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -390,13 +390,15 @@ public class MiniDFSCluster { Configuration conf; String[] dnArgs; SecureResources secureResources; + int ipcPort; DataNodeProperties(DataNode node, Configuration conf, String[] args, - SecureResources secureResources) { + SecureResources secureResources, int ipcPort) { this.datanode = node; this.conf = conf; this.dnArgs = args; this.secureResources = secureResources; + this.ipcPort = ipcPort; } } @@ -1269,7 +1271,8 @@ public class MiniDFSCluster { racks[i-curDatanodesNum]); } dn.runDatanodeDaemon(); - dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources)); + dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, + secureResources, dn.getIpcPort())); } curDatanodesNum += numDataNodes; this.numDataNodes += numDataNodes; @@ -1719,10 +1722,12 @@ public class MiniDFSCluster { InetSocketAddress addr = dnprop.datanode.getXferAddress(); conf.set(DFS_DATANODE_ADDRESS_KEY, addr.getAddress().getHostAddress() + ":" + addr.getPort()); + conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, + addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort); } + DataNode newDn = DataNode.createDataNode(args, conf, secureResources); dataNodes.add(new DataNodeProperties( - DataNode.createDataNode(args, conf, secureResources), - newconf, args, secureResources)); + newDn, newconf, args, secureResources, newDn.getIpcPort())); numDataNodes++; return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java index ae5028c2825..d06a6a04519 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java @@ -178,7 +178,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster { } } dn.runDatanodeDaemon(); - dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources)); + dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort())); } curDatanodesNum += numDataNodes; this.numDataNodes += numDataNodes; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 8c861aab821..a55eb6d69f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -41,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -54,14 +57,21 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.Test; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + public class TestDistributedFileSystem { private static final Random RAN = new Random(); @@ -650,20 +660,47 @@ public class TestDistributedFileSystem { * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} * call */ - @Test + @Test(timeout=60000) public void testGetFileBlockStorageLocationsBatching() throws Exception { final Configuration conf = getTestConfiguration(); + ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.TRACE); + ((Log4JLogger)BlockStorageLocationUtil.LOG).getLogger().setLevel(Level.TRACE); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.TRACE); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2).build(); try { - DistributedFileSystem fs = cluster.getFileSystem(); + final DistributedFileSystem fs = cluster.getFileSystem(); // Create two files - Path tmpFile1 = new Path("/tmpfile1.dat"); - Path tmpFile2 = new Path("/tmpfile2.dat"); + final Path tmpFile1 = new Path("/tmpfile1.dat"); + final Path tmpFile2 = new Path("/tmpfile2.dat"); DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); + // Make sure files are fully replicated before continuing + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + List list = Lists.newArrayList(); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0, + 1024))); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0, + 1024))); + int totalRepl = 0; + for (BlockLocation loc : list) { + totalRepl += loc.getHosts().length; + } + if (totalRepl == 4) { + return true; + } + } catch(IOException e) { + // swallow + } + return false; + } + }, 500, 30000); // Get locations of blocks of both files and concat together BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); @@ -694,7 +731,7 @@ public class TestDistributedFileSystem { VolumeId id = l.getVolumeIds()[i]; String name = l.getNames()[i]; assertTrue("Expected block to be valid on datanode " + name, - id.isValid()); + id != null); } } } finally { @@ -706,38 +743,97 @@ public class TestDistributedFileSystem { * Tests error paths for * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} */ - @Test + @Test(timeout=60000) public void testGetFileBlockStorageLocationsError() throws Exception { final Configuration conf = getTestConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + MiniDFSCluster cluster = null; try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.getDataNodes(); - DistributedFileSystem fs = cluster.getFileSystem(); - // Create a file - Path tmpFile = new Path("/tmpfile1.dat"); - DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl); - // Get locations of blocks of the file - BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024); - // Stop a datanode to simulate a failure - cluster.stopDataNode(0); + final DistributedFileSystem fs = cluster.getFileSystem(); + + // Create a few files and add together their block locations into + // a list. + final Path tmpFile1 = new Path("/errorfile1.dat"); + final Path tmpFile2 = new Path("/errorfile2.dat"); + + DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); + DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); + + // Make sure files are fully replicated before continuing + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + List list = Lists.newArrayList(); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0, + 1024))); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0, + 1024))); + int totalRepl = 0; + for (BlockLocation loc : list) { + totalRepl += loc.getHosts().length; + } + if (totalRepl == 4) { + return true; + } + } catch(IOException e) { + // swallow + } + return false; + } + }, 500, 30000); + + BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); + BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); + + List allLocs = Lists.newArrayList(); + allLocs.addAll(Arrays.asList(blockLocs1)); + allLocs.addAll(Arrays.asList(blockLocs2)); + + // Stop a datanode to simulate a failure. + DataNodeProperties stoppedNode = cluster.stopDataNode(0); + // Fetch VolumeBlockLocations - BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays - .asList(blockLocs)); - - assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1, + BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(allLocs); + assertEquals("Expected two HdfsBlockLocation for two 1-block files", 2, locs.length); - + for (BlockStorageLocation l : locs) { assertEquals("Expected two replicas for each block", 2, + l.getHosts().length); + assertEquals("Expected two VolumeIDs for each block", 2, l.getVolumeIds().length); - assertTrue("Expected one valid and one invalid replica", - (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); + assertTrue("Expected one valid and one invalid volume", + (l.getVolumeIds()[0] == null) ^ (l.getVolumeIds()[1] == null)); } + + // Start the datanode again, and remove one of the blocks. + // This is a different type of failure where the block itself + // is invalid. + cluster.restartDataNode(stoppedNode, true /*keepPort*/); + cluster.waitActive(); + + fs.delete(tmpFile2, true); + HATestUtil.waitForNNToIssueDeletions(cluster.getNameNode()); + cluster.triggerHeartbeats(); + HATestUtil.waitForDNDeletions(cluster); + + locs = fs.getFileBlockStorageLocations(allLocs); + assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2, + locs.length); + assertNotNull(locs[0].getVolumeIds()[0]); + assertNotNull(locs[0].getVolumeIds()[1]); + assertNull(locs[1].getVolumeIds()[0]); + assertNull(locs[1].getVolumeIds()[1]); } finally { - cluster.shutdown(); + if (cluster != null) { + cluster.shutdown(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 6939643bd1c..82081529740 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1049,7 +1049,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) + public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 9e9c181c1f6..b8fe2c1a720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -111,7 +111,7 @@ public abstract class HATestUtil { * Wait for the NameNode to issue any deletions that are already * pending (i.e. for the pendingDeletionBlocksCount to go to 0) */ - static void waitForNNToIssueDeletions(final NameNode nn) + public static void waitForNNToIssueDeletions(final NameNode nn) throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override