diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d220f506711..4df837a4fd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -362,6 +362,9 @@ Release 2.4.0 - UNRELEASED HDFS-6018. Exception recorded in LOG when IPCLoggerChannel#close is called. (jing9) + HDFS-3969. Small bug fixes and improvements for disk locations API. + (Todd Lipcon and Andrew Wang) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java index aa6785037c1..6e9d3d77cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java @@ -17,18 +17,18 @@ */ package org.apache.hadoop.fs; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.base.Preconditions; /** * HDFS-specific volume identifier which implements {@link VolumeId}. Can be * used to differentiate between the data directories on a single datanode. This * identifier is only unique on a per-datanode basis. - * - * Note that invalid IDs are represented by {@link VolumeId#INVALID_VOLUME_ID}. */ @InterfaceStability.Unstable @InterfaceAudience.Public @@ -37,28 +37,15 @@ public class HdfsVolumeId implements VolumeId { private final byte[] id; public HdfsVolumeId(byte[] id) { - if (id == null) { - throw new NullPointerException("A valid Id can only be constructed " + - "with a non-null byte array."); - } + Preconditions.checkNotNull(id, "id cannot be null"); this.id = id; } - @Override - public final boolean isValid() { - return true; - } - @Override public int compareTo(VolumeId arg0) { if (arg0 == null) { return 1; } - if (!arg0.isValid()) { - // any valid ID is greater - // than any invalid ID: - return 1; - } return hashCode() - arg0.hashCode(); } @@ -76,14 +63,11 @@ public boolean equals(Object obj) { return true; } HdfsVolumeId that = (HdfsVolumeId) obj; - // NB: if (!obj.isValid()) { return false; } check is not necessary - // because we have class identity checking above, and for this class - // isValid() is always true. return new EqualsBuilder().append(this.id, that.id).isEquals(); } @Override public String toString() { - return Base64.encodeBase64String(id); + return StringUtils.byteToHexString(id); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java index b756241e976..e56e30409eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java @@ -28,57 +28,6 @@ @InterfaceAudience.Public public interface VolumeId extends Comparable { - /** - * Represents an invalid Volume ID (ID for unknown content). - */ - public static final VolumeId INVALID_VOLUME_ID = new VolumeId() { - - @Override - public int compareTo(VolumeId arg0) { - // This object is equal only to itself; - // It is greater than null, and - // is always less than any other VolumeId: - if (arg0 == null) { - return 1; - } - if (arg0 == this) { - return 0; - } else { - return -1; - } - } - - @Override - public boolean equals(Object obj) { - // this object is equal only to itself: - return (obj == this); - } - - @Override - public int hashCode() { - return Integer.MIN_VALUE; - } - - @Override - public boolean isValid() { - return false; - } - - @Override - public String toString() { - return "Invalid VolumeId"; - } - }; - - /** - * Indicates if the disk identifier is valid. Invalid identifiers indicate - * that the block was not present, or the location could otherwise not be - * determined. - * - * @return true if the disk identifier is valid - */ - public boolean isValid(); - @Override abstract public int compareTo(VolumeId arg0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java index 934f8dfe516..a2a53d30286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -43,7 +41,6 @@ import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -51,16 +48,20 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.token.Token; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + @InterfaceAudience.Private @InterfaceStability.Unstable class BlockStorageLocationUtil { - private static final Log LOG = LogFactory + static final Log LOG = LogFactory .getLog(BlockStorageLocationUtil.class); /** * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set - * of datanodes and blocks. + * of datanodes and blocks. The blocks must all correspond to the same + * block pool. * * @param datanodeBlocks * Map of datanodes to block replicas at each datanode @@ -70,6 +71,11 @@ class BlockStorageLocationUtil { private static List createVolumeBlockLocationCallables( Configuration conf, Map> datanodeBlocks, int timeout, boolean connectToDnViaHostname) { + + if (datanodeBlocks.isEmpty()) { + return Lists.newArrayList(); + } + // Construct the callables, one per datanode List callables = new ArrayList(); @@ -78,17 +84,32 @@ private static List createVolumeBlockLocationCallab // Construct RPC parameters DatanodeInfo datanode = entry.getKey(); List locatedBlocks = entry.getValue(); - List extendedBlocks = - new ArrayList(locatedBlocks.size()); + if (locatedBlocks.isEmpty()) { + continue; + } + + // Ensure that the blocks all are from the same block pool. + String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId(); + for (LocatedBlock lb : locatedBlocks) { + if (!poolId.equals(lb.getBlock().getBlockPoolId())) { + throw new IllegalArgumentException( + "All blocks to be queried must be in the same block pool: " + + locatedBlocks.get(0).getBlock() + " and " + lb + + " are from different pools."); + } + } + + long[] blockIds = new long[locatedBlocks.size()]; + int i = 0; List> dnTokens = new ArrayList>( locatedBlocks.size()); for (LocatedBlock b : locatedBlocks) { - extendedBlocks.add(b.getBlock()); + blockIds[i++] = b.getBlock().getBlockId(); dnTokens.add(b.getBlockToken()); } VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( - conf, datanode, extendedBlocks, dnTokens, timeout, + conf, datanode, poolId, blockIds, dnTokens, timeout, connectToDnViaHostname); callables.add(callable); } @@ -102,18 +123,17 @@ private static List createVolumeBlockLocationCallab * * @param datanodeBlocks * Map of datanodes to the blocks present on the DN - * @return metadatas List of block metadata for each datanode, specifying - * volume locations for each block + * @return metadatas Map of datanodes to block metadata of the DN * @throws InvalidBlockTokenException * if client does not have read access on a requested block */ - static List queryDatanodesForHdfsBlocksMetadata( + static Map queryDatanodesForHdfsBlocksMetadata( Configuration conf, Map> datanodeBlocks, - int poolsize, int timeout, boolean connectToDnViaHostname) + int poolsize, int timeoutMs, boolean connectToDnViaHostname) throws InvalidBlockTokenException { List callables = - createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout, + createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs, connectToDnViaHostname); // Use a thread pool to execute the Callables in parallel @@ -121,27 +141,24 @@ static List queryDatanodesForHdfsBlocksMetadata( new ArrayList>(); ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); try { - futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS); + futures = executor.invokeAll(callables, timeoutMs, + TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // Swallow the exception here, because we can return partial results } executor.shutdown(); - // Initialize metadatas list with nulls - // This is used to later indicate if we didn't get a response from a DN - List metadatas = new ArrayList(); - for (int i = 0; i < futures.size(); i++) { - metadatas.add(null); - } + Map metadatas = + Maps.newHashMapWithExpectedSize(datanodeBlocks.size()); // Fill in metadatas with results from DN RPCs, where possible for (int i = 0; i < futures.size(); i++) { + VolumeBlockLocationCallable callable = callables.get(i); + DatanodeInfo datanode = callable.getDatanodeInfo(); Future future = futures.get(i); try { HdfsBlocksMetadata metadata = future.get(); - metadatas.set(i, metadata); + metadatas.put(callable.getDatanodeInfo(), metadata); } catch (ExecutionException e) { - VolumeBlockLocationCallable callable = callables.get(i); - DatanodeInfo datanode = callable.getDatanodeInfo(); Throwable t = e.getCause(); if (t instanceof InvalidBlockTokenException) { LOG.warn("Invalid access token when trying to retrieve " @@ -153,8 +170,8 @@ else if (t instanceof UnsupportedOperationException) { + " required #getHdfsBlocksMetadata() API"); throw (UnsupportedOperationException) t; } else { - LOG.info("Failed to connect to datanode " + - datanode.getIpcAddr(false)); + LOG.info("Failed to query block locations on datanode " + + datanode.getIpcAddr(false) + ": " + t); } if (LOG.isDebugEnabled()) { LOG.debug("Could not fetch information from datanode", t); @@ -175,23 +192,21 @@ else if (t instanceof UnsupportedOperationException) { * * @param blocks * Original LocatedBlock array - * @param datanodeBlocks - * Mapping from datanodes to the list of replicas on each datanode * @param metadatas * VolumeId information for the replicas on each datanode * @return blockVolumeIds per-replica VolumeId information associated with the * parent LocatedBlock */ static Map> associateVolumeIdsWithBlocks( - List blocks, Map> datanodeBlocks, List metadatas) { + List blocks, + Map metadatas) { // Initialize mapping of ExtendedBlock to LocatedBlock. // Used to associate results from DN RPCs to the parent LocatedBlock - Map extBlockToLocBlock = - new HashMap(); + Map blockIdToLocBlock = + new HashMap(); for (LocatedBlock b : blocks) { - extBlockToLocBlock.put(b.getBlock(), b); + blockIdToLocBlock.put(b.getBlock().getBlockId(), b); } // Initialize the mapping of blocks -> list of VolumeIds, one per replica @@ -200,9 +215,8 @@ static Map> associateVolumeIdsWithBlocks( new HashMap>(); for (LocatedBlock b : blocks) { ArrayList l = new ArrayList(b.getLocations().length); - // Start off all IDs as invalid, fill it in later with results from RPCs for (int i = 0; i < b.getLocations().length; i++) { - l.add(VolumeId.INVALID_VOLUME_ID); + l.add(null); } blockVolumeIds.put(b, l); } @@ -210,27 +224,28 @@ static Map> associateVolumeIdsWithBlocks( // Iterate through the list of metadatas (one per datanode). // For each metadata, if it's valid, insert its volume location information // into the Map returned to the caller - Iterator metadatasIter = metadatas.iterator(); - Iterator datanodeIter = datanodeBlocks.keySet().iterator(); - while (metadatasIter.hasNext()) { - HdfsBlocksMetadata metadata = metadatasIter.next(); - DatanodeInfo datanode = datanodeIter.next(); + for (Map.Entry entry : metadatas.entrySet()) { + DatanodeInfo datanode = entry.getKey(); + HdfsBlocksMetadata metadata = entry.getValue(); // Check if metadata is valid if (metadata == null) { continue; } - ExtendedBlock[] metaBlocks = metadata.getBlocks(); + long[] metaBlockIds = metadata.getBlockIds(); List metaVolumeIds = metadata.getVolumeIds(); List metaVolumeIndexes = metadata.getVolumeIndexes(); // Add VolumeId for each replica in the HdfsBlocksMetadata - for (int j = 0; j < metaBlocks.length; j++) { + for (int j = 0; j < metaBlockIds.length; j++) { int volumeIndex = metaVolumeIndexes.get(j); - ExtendedBlock extBlock = metaBlocks[j]; + long blockId = metaBlockIds[j]; // Skip if block wasn't found, or not a valid index into metaVolumeIds // Also skip if the DN responded with a block we didn't ask for if (volumeIndex == Integer.MAX_VALUE || volumeIndex >= metaVolumeIds.size() - || !extBlockToLocBlock.containsKey(extBlock)) { + || !blockIdToLocBlock.containsKey(blockId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("No data for block " + blockId); + } continue; } // Get the VolumeId by indexing into the list of VolumeIds @@ -238,7 +253,7 @@ static Map> associateVolumeIdsWithBlocks( byte[] volumeId = metaVolumeIds.get(volumeIndex); HdfsVolumeId id = new HdfsVolumeId(volumeId); // Find out which index we are in the LocatedBlock's replicas - LocatedBlock locBlock = extBlockToLocBlock.get(extBlock); + LocatedBlock locBlock = blockIdToLocBlock.get(blockId); DatanodeInfo[] dnInfos = locBlock.getLocations(); int index = -1; for (int k = 0; k < dnInfos.length; k++) { @@ -292,21 +307,23 @@ static BlockStorageLocation[] convertToVolumeBlockLocations( private static class VolumeBlockLocationCallable implements Callable { - private Configuration configuration; - private int timeout; - private DatanodeInfo datanode; - private List extendedBlocks; - private List> dnTokens; - private boolean connectToDnViaHostname; + private final Configuration configuration; + private final int timeout; + private final DatanodeInfo datanode; + private final String poolId; + private final long[] blockIds; + private final List> dnTokens; + private final boolean connectToDnViaHostname; VolumeBlockLocationCallable(Configuration configuration, - DatanodeInfo datanode, List extendedBlocks, + DatanodeInfo datanode, String poolId, long []blockIds, List> dnTokens, int timeout, boolean connectToDnViaHostname) { this.configuration = configuration; this.timeout = timeout; this.datanode = datanode; - this.extendedBlocks = extendedBlocks; + this.poolId = poolId; + this.blockIds = blockIds; this.dnTokens = dnTokens; this.connectToDnViaHostname = connectToDnViaHostname; } @@ -323,7 +340,7 @@ public HdfsBlocksMetadata call() throws Exception { try { cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, timeout, connectToDnViaHostname); - metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens); + metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); } catch (IOException e) { // Bubble this up to the caller, handle with the Future throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3c066c78f76..ad3362d0805 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -241,6 +241,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { */ public static class Conf { final int hdfsTimeout; // timeout value for a DFS operation. + final int maxFailoverAttempts; final int maxRetryAttempts; final int failoverSleepBaseMillis; @@ -267,7 +268,7 @@ public static class Conf { final boolean connectToDnViaHostname; final boolean getHdfsBlocksMetadataEnabled; final int getFileBlockStorageLocationsNumThreads; - final int getFileBlockStorageLocationsTimeout; + final int getFileBlockStorageLocationsTimeoutMs; final int retryTimesForGetLastBlockLength; final int retryIntervalForGetLastBlockLength; final long datanodeRestartTimeout; @@ -290,7 +291,6 @@ public static class Conf { public Conf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getTimeout(conf); - maxFailoverAttempts = conf.getInt( DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); @@ -349,9 +349,9 @@ public Conf(Configuration conf) { getFileBlockStorageLocationsNumThreads = conf.getInt( DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); - getFileBlockStorageLocationsTimeout = conf.getInt( - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT, - DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT); + getFileBlockStorageLocationsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS, + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT); retryTimesForGetLastBlockLength = conf.getInt( DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH, DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); @@ -1209,16 +1209,20 @@ public BlockStorageLocation[] getBlockStorageLocations( } // Make RPCs to the datanodes to get volume locations for its replicas - List metadatas = BlockStorageLocationUtil + Map metadatas = BlockStorageLocationUtil .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads, - getConf().getFileBlockStorageLocationsTimeout, + getConf().getFileBlockStorageLocationsTimeoutMs, getConf().connectToDnViaHostname); + if (LOG.isTraceEnabled()) { + LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); + } + // Regroup the returned VolumeId metadata to again be grouped by // LocatedBlock rather than by datanode Map> blockVolumeIds = BlockStorageLocationUtil - .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); + .associateVolumeIdsWithBlocks(blocks, metadatas); // Combine original BlockLocations with new VolumeId information BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7789b60fe38..db514637673 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -66,8 +66,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads"; public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10; - public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout"; - public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60; + public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = "dfs.client.file-block-storage-locations.timeout.millis"; + public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000; public static final String DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.times.get-last-block-length"; public static final int DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT = 3; public static final String DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.interval-ms.get-last-block-length"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 9c59fb0d77c..7f4bf0d5953 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -234,9 +234,9 @@ public BlockLocation[] next(final FileSystem fs, final Path p) * The returned array of {@link BlockStorageLocation} augments * {@link BlockLocation} with a {@link VolumeId} per block replica. The * VolumeId specifies the volume on the datanode on which the replica resides. - * The VolumeId has to be checked via {@link VolumeId#isValid()} before being - * used because volume information can be unavailable if the corresponding - * datanode is down or if the requested block is not found. + * The VolumeId associated with a replica may be null because volume + * information can be unavailable if the corresponding datanode is down or + * if the requested block is not found. * * This API is unstable, and datanode-side support is disabled by default. It * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java index 345ceaaa79d..8f2966ac47b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java @@ -137,7 +137,9 @@ private static void addDeprecatedKeys() { new DeprecationDelta("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES), new DeprecationDelta("dfs.federation.nameservice.id", - DFSConfigKeys.DFS_NAMESERVICE_ID) + DFSConfigKeys.DFS_NAMESERVICE_ID), + new DeprecationDelta("dfs.client.file-block-storage-locations.timeout", + DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS) }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java index 3ced32cd8e7..672ad0ce630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java @@ -113,7 +113,8 @@ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, * This is in the form of an opaque {@link VolumeId} for each configured * data directory, which is not guaranteed to be the same across DN restarts. * - * @param blocks + * @param blockPoolId the pool to query + * @param blockIds * list of blocks on the local datanode * @param tokens * block access tokens corresponding to the requested blocks @@ -122,8 +123,8 @@ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, * @throws IOException * if datanode is unreachable, or replica is not found on datanode */ - HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, - List> tokens) throws IOException; + HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, + long []blockIds, List> tokens) throws IOException; /** * Shuts down a datanode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java index 5836f3da21c..487bbcbcadd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsBlocksMetadata.java @@ -22,6 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; + /** * Augments an array of blocks on a datanode with additional information about * where the block is stored. @@ -30,10 +34,13 @@ @InterfaceStability.Unstable public class HdfsBlocksMetadata { + /** The block pool that was queried */ + private final String blockPoolId; + /** * List of blocks */ - private final ExtendedBlock[] blocks; + private final long[] blockIds; /** * List of volumes @@ -50,7 +57,7 @@ public class HdfsBlocksMetadata { /** * Constructs HdfsBlocksMetadata. * - * @param blocks + * @param blockIds * List of blocks described * @param volumeIds * List of potential volume identifiers, specifying volumes where @@ -58,9 +65,13 @@ public class HdfsBlocksMetadata { * @param volumeIndexes * Indexes into the list of volume identifiers, one per block */ - public HdfsBlocksMetadata(ExtendedBlock[] blocks, List volumeIds, + public HdfsBlocksMetadata(String blockPoolId, + long[] blockIds, List volumeIds, List volumeIndexes) { - this.blocks = blocks; + Preconditions.checkArgument(blockIds.length == volumeIndexes.size(), + "Argument lengths should match"); + this.blockPoolId = blockPoolId; + this.blockIds = blockIds; this.volumeIds = volumeIds; this.volumeIndexes = volumeIndexes; } @@ -70,8 +81,8 @@ public HdfsBlocksMetadata(ExtendedBlock[] blocks, List volumeIds, * * @return array of blocks */ - public ExtendedBlock[] getBlocks() { - return blocks; + public long[] getBlockIds() { + return blockIds; } /** @@ -91,4 +102,10 @@ public List getVolumeIds() { public List getVolumeIndexes() { return volumeIndexes; } + + @Override + public String toString() { + return "Metadata for " + blockIds.length + " blocks in " + + blockPoolId + ": " + Joiner.on(",").join(Longs.asList(blockIds)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 08617793b7f..c8fa2fed66a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -47,6 +47,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.VersionInfo; +import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -132,19 +133,17 @@ public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations( throws ServiceException { HdfsBlocksMetadata resp; try { - // Construct the Lists to make the actual call - List blocks = - new ArrayList(request.getBlocksCount()); - for (ExtendedBlockProto b : request.getBlocksList()) { - blocks.add(PBHelper.convert(b)); - } + String poolId = request.getBlockPoolId(); + List> tokens = new ArrayList>(request.getTokensCount()); for (TokenProto b : request.getTokensList()) { tokens.add(PBHelper.convert(b)); } + long[] blockIds = Longs.toArray(request.getBlockIdsList()); + // Call the real implementation - resp = impl.getHdfsBlocksMetadata(blocks, tokens); + resp = impl.getHdfsBlocksMetadata(poolId, blockIds, tokens); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 06f7cdc2e29..ca152b3a83a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import javax.net.SocketFactory; @@ -61,6 +62,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; +import com.google.common.primitives.Longs; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -223,23 +225,19 @@ public Object getUnderlyingProxyObject() { } @Override - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId, + long[] blockIds, List> tokens) throws IOException { - // Convert to proto objects - List blocksProtos = - new ArrayList(blocks.size()); List tokensProtos = new ArrayList(tokens.size()); - for (ExtendedBlock b : blocks) { - blocksProtos.add(PBHelper.convert(b)); - } for (Token t : tokens) { tokensProtos.add(PBHelper.convert(t)); } // Build the request GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto.newBuilder() - .addAllBlocks(blocksProtos) + .setBlockPoolId(blockPoolId) + .addAllBlockIds(Longs.asList(blockIds)) .addAllTokens(tokensProtos) .build(); // Send the RPC @@ -258,7 +256,7 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, // Array of indexes into the list of volumes, one per block List volumeIndexes = response.getVolumeIndexesList(); // Parsed HdfsVolumeId values, one per block - return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + return new HdfsBlocksMetadata(blockPoolId, blockIds, volumeIds, volumeIndexes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0ed0896854b..8472287a273 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1136,22 +1136,23 @@ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk, } @Override - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + public HdfsBlocksMetadata getHdfsBlocksMetadata( + String bpId, long[] blockIds, List> tokens) throws IOException, UnsupportedOperationException { if (!getHdfsBlockLocationsEnabled) { throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " + " is not enabled in datanode config"); } - if (blocks.size() != tokens.size()) { + if (blockIds.length != tokens.size()) { throw new IOException("Differing number of blocks and tokens"); } // Check access for each block - for (int i = 0; i < blocks.size(); i++) { - checkBlockToken(blocks.get(i), tokens.get(i), - BlockTokenSecretManager.AccessMode.READ); + for (int i = 0; i < blockIds.length; i++) { + checkBlockToken(new ExtendedBlock(bpId, blockIds[i]), + tokens.get(i), BlockTokenSecretManager.AccessMode.READ); } - return data.getHdfsBlocksMetadata(blocks); + return data.getHdfsBlocksMetadata(bpId, blockIds); } private void checkBlockToken(ExtendedBlock block, Token token, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index e1cdeb3fdd5..e60f937e1ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -405,12 +405,13 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in * blocks. * - * @param blocks List of blocks for which to return metadata + * @param bpid pool to query + * @param blockIds List of block ids for which to return metadata * @return metadata Metadata for the list of blocks * @throws IOException */ - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) - throws IOException; + public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, + long[] blockIds) throws IOException; /** * Enable 'trash' for the given dataset. When trash is enabled, files are diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 0f6e308f359..0031eeafc27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1822,31 +1822,35 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) } @Override // FsDatasetSpi - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) - throws IOException { + public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId, + long[] blockIds) throws IOException { // List of VolumeIds, one per volume on the datanode List blocksVolumeIds = new ArrayList(volumes.volumes.size()); // List of indexes into the list of VolumeIds, pointing at the VolumeId of // the volume that the block is on - List blocksVolumeIndexes = new ArrayList(blocks.size()); + List blocksVolumeIndexes = new ArrayList(blockIds.length); // Initialize the list of VolumeIds simply by enumerating the volumes for (int i = 0; i < volumes.volumes.size(); i++) { blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); } // Determine the index of the VolumeId of each block's volume, by comparing // the block's volume against the enumerated volumes - for (int i = 0; i < blocks.size(); i++) { - ExtendedBlock block = blocks.get(i); - FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume(); + for (int i = 0; i < blockIds.length; i++) { + long blockId = blockIds[i]; boolean isValid = false; + + ReplicaInfo info = volumeMap.get(poolId, blockId); int volumeIndex = 0; - for (FsVolumeImpl volume : volumes.volumes) { - // This comparison of references should be safe - if (blockVolume == volume) { - isValid = true; - break; + if (info != null) { + FsVolumeSpi blockVolume = info.getVolume(); + for (FsVolumeImpl volume : volumes.volumes) { + // This comparison of references should be safe + if (blockVolume == volume) { + isValid = true; + break; + } + volumeIndex++; } - volumeIndex++; } // Indicates that the block is not present, or not found in a data dir if (!isValid) { @@ -1854,7 +1858,7 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) } blocksVolumeIndexes.add(volumeIndex); } - return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + return new HdfsBlocksMetadata(poolId, blockIds, blocksVolumeIds, blocksVolumeIndexes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto index 3d2b5a367ff..8779f7c3580 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto @@ -102,12 +102,18 @@ message GetBlockLocalPathInfoResponseProto { } /** - * blocks - list of ExtendedBlocks on which we are querying additional info - * tokens - list of access tokens corresponding to list of ExtendedBlocks + * Query for the disk locations of a number of blocks on this DN. + * blockPoolId - the pool to query + * blockIds - list of block IDs to query + * tokens - list of access tokens corresponding to list of block IDs */ message GetHdfsBlockLocationsRequestProto { - repeated ExtendedBlockProto blocks = 1; + // Removed: HDFS-3969 + // repeated ExtendedBlockProto blocks = 1; repeated hadoop.common.TokenProto tokens = 2; + + required string blockPoolId = 3; + repeated sfixed64 blockIds = 4 [ packed = true ]; } /** @@ -118,7 +124,7 @@ message GetHdfsBlockLocationsRequestProto { */ message GetHdfsBlockLocationsResponseProto { repeated bytes volumeIds = 1; - repeated uint32 volumeIndexes = 2; + repeated uint32 volumeIndexes = 2 [ packed = true ]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2468cb10745..6b61f4fcb52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1321,10 +1321,10 @@ - dfs.client.file-block-storage-locations.timeout - 60 + dfs.client.file-block-storage-locations.timeout.millis + 1000 - Timeout (in seconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations(). + Timeout (in milliseconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations(). diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java index 3584f21b1a4..5bc1a7b1efb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestVolumeId.java @@ -123,56 +123,19 @@ private void testEqMany(final boolean eq, Comparable... volumeIds) { @Test public void testIdEmptyBytes() { final VolumeId idEmpty1 = new HdfsVolumeId(new byte[0]); - assertTrue(idEmpty1.isValid()); final VolumeId idEmpty2 = new HdfsVolumeId(new byte[0]); - assertTrue(idEmpty2.isValid()); final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 }); - assertTrue(idNotEmpty.isValid()); testEq(true, idEmpty1, idEmpty2); testEq(false, idEmpty1, idNotEmpty); testEq(false, idEmpty2, idNotEmpty); } - - /* - * Test the VolumeId.INVALID_VOLUME_ID singleton. - */ - @Test - public void testInvalidId() { - try { - new HdfsVolumeId(null); - assertTrue("NPE expected.", false); - } catch (NullPointerException npe) { - // okay - } - final VolumeId idEmpty = new HdfsVolumeId(new byte[] {}); - final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 }); - - testEq(false, VolumeId.INVALID_VOLUME_ID, idNotEmpty); - testEq(false, VolumeId.INVALID_VOLUME_ID, idEmpty); - - testEqMany(true, - new VolumeId[] { - VolumeId.INVALID_VOLUME_ID, - VolumeId.INVALID_VOLUME_ID, - VolumeId.INVALID_VOLUME_ID } ); - testEqMany(false, - new VolumeId[] { - VolumeId.INVALID_VOLUME_ID, - idEmpty, - idNotEmpty }); - } - + /* * test #toString() for typical VolumeId equality classes */ @Test public void testToString() { - // The #toString() return value is only checked for != null. - // We cannot assert more. - String strInvalid = VolumeId.INVALID_VOLUME_ID.toString(); - assertNotNull(strInvalid); - String strEmpty = new HdfsVolumeId(new byte[] {}).toString(); assertNotNull(strEmpty); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 2bf11033af9..d81d67b67c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -399,13 +399,15 @@ public class DataNodeProperties { Configuration conf; String[] dnArgs; SecureResources secureResources; + int ipcPort; DataNodeProperties(DataNode node, Configuration conf, String[] args, - SecureResources secureResources) { + SecureResources secureResources, int ipcPort) { this.datanode = node; this.conf = conf; this.dnArgs = args; this.secureResources = secureResources; + this.ipcPort = ipcPort; } public void setDnArgs(String ... args) { @@ -1301,7 +1303,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, racks[i-curDatanodesNum]); } dn.runDatanodeDaemon(); - dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources)); + dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, + secureResources, dn.getIpcPort())); } curDatanodesNum += numDataNodes; this.numDataNodes += numDataNodes; @@ -1758,10 +1761,12 @@ public synchronized boolean restartDataNode(DataNodeProperties dnprop, InetSocketAddress addr = dnprop.datanode.getXferAddress(); conf.set(DFS_DATANODE_ADDRESS_KEY, addr.getAddress().getHostAddress() + ":" + addr.getPort()); + conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, + addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort); } + DataNode newDn = DataNode.createDataNode(args, conf, secureResources); dataNodes.add(new DataNodeProperties( - DataNode.createDataNode(args, conf, secureResources), - newconf, args, secureResources)); + newDn, newconf, args, secureResources, newDn.getIpcPort())); numDataNodes++; return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java index ae5028c2825..d06a6a04519 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java @@ -178,7 +178,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, } } dn.runDatanodeDaemon(); - dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources)); + dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort())); } curDatanodesNum += numDataNodes; this.numDataNodes += numDataNodes; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 9bafa263805..42b1f9aab0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; @@ -43,6 +45,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -56,15 +59,22 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.Test; import org.mockito.InOrder; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + public class TestDistributedFileSystem { private static final Random RAN = new Random(); @@ -684,20 +694,47 @@ public void testAllWithNoXmlDefaults() throws Exception { * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} * call */ - @Test + @Test(timeout=60000) public void testGetFileBlockStorageLocationsBatching() throws Exception { final Configuration conf = getTestConfiguration(); + ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.TRACE); + ((Log4JLogger)BlockStorageLocationUtil.LOG).getLogger().setLevel(Level.TRACE); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.TRACE); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2).build(); try { - DistributedFileSystem fs = cluster.getFileSystem(); + final DistributedFileSystem fs = cluster.getFileSystem(); // Create two files - Path tmpFile1 = new Path("/tmpfile1.dat"); - Path tmpFile2 = new Path("/tmpfile2.dat"); + final Path tmpFile1 = new Path("/tmpfile1.dat"); + final Path tmpFile2 = new Path("/tmpfile2.dat"); DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); + // Make sure files are fully replicated before continuing + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + List list = Lists.newArrayList(); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0, + 1024))); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0, + 1024))); + int totalRepl = 0; + for (BlockLocation loc : list) { + totalRepl += loc.getHosts().length; + } + if (totalRepl == 4) { + return true; + } + } catch(IOException e) { + // swallow + } + return false; + } + }, 500, 30000); // Get locations of blocks of both files and concat together BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); @@ -728,7 +765,7 @@ public void testGetFileBlockStorageLocationsBatching() throws Exception { VolumeId id = l.getVolumeIds()[i]; String name = l.getNames()[i]; assertTrue("Expected block to be valid on datanode " + name, - id.isValid()); + id != null); } } } finally { @@ -740,38 +777,97 @@ public void testGetFileBlockStorageLocationsBatching() throws Exception { * Tests error paths for * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} */ - @Test + @Test(timeout=60000) public void testGetFileBlockStorageLocationsError() throws Exception { final Configuration conf = getTestConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + MiniDFSCluster cluster = null; try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.getDataNodes(); - DistributedFileSystem fs = cluster.getFileSystem(); - // Create a file - Path tmpFile = new Path("/tmpfile1.dat"); - DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl); - // Get locations of blocks of the file - BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024); - // Stop a datanode to simulate a failure - cluster.stopDataNode(0); + final DistributedFileSystem fs = cluster.getFileSystem(); + + // Create a few files and add together their block locations into + // a list. + final Path tmpFile1 = new Path("/errorfile1.dat"); + final Path tmpFile2 = new Path("/errorfile2.dat"); + + DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); + DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); + + // Make sure files are fully replicated before continuing + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + List list = Lists.newArrayList(); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0, + 1024))); + list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0, + 1024))); + int totalRepl = 0; + for (BlockLocation loc : list) { + totalRepl += loc.getHosts().length; + } + if (totalRepl == 4) { + return true; + } + } catch(IOException e) { + // swallow + } + return false; + } + }, 500, 30000); + + BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); + BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); + + List allLocs = Lists.newArrayList(); + allLocs.addAll(Arrays.asList(blockLocs1)); + allLocs.addAll(Arrays.asList(blockLocs2)); + + // Stop a datanode to simulate a failure. + DataNodeProperties stoppedNode = cluster.stopDataNode(0); + // Fetch VolumeBlockLocations - BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays - .asList(blockLocs)); - - assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1, + BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(allLocs); + assertEquals("Expected two HdfsBlockLocation for two 1-block files", 2, locs.length); - + for (BlockStorageLocation l : locs) { assertEquals("Expected two replicas for each block", 2, + l.getHosts().length); + assertEquals("Expected two VolumeIDs for each block", 2, l.getVolumeIds().length); - assertTrue("Expected one valid and one invalid replica", - (l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); + assertTrue("Expected one valid and one invalid volume", + (l.getVolumeIds()[0] == null) ^ (l.getVolumeIds()[1] == null)); } + + // Start the datanode again, and remove one of the blocks. + // This is a different type of failure where the block itself + // is invalid. + cluster.restartDataNode(stoppedNode, true /*keepPort*/); + cluster.waitActive(); + + fs.delete(tmpFile2, true); + HATestUtil.waitForNNToIssueDeletions(cluster.getNameNode()); + cluster.triggerHeartbeats(); + HATestUtil.waitForDNDeletions(cluster); + + locs = fs.getFileBlockStorageLocations(allLocs); + assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2, + locs.length); + assertNotNull(locs[0].getVolumeIds()[0]); + assertNotNull(locs[0].getVolumeIds()[1]); + assertNull(locs[1].getVolumeIds()[0]); + assertNull(locs[1].getVolumeIds()[1]); } finally { - cluster.shutdown(); + if (cluster != null) { + cluster.shutdown(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index cfade1b5db6..73e46665501 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1049,7 +1049,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) { } @Override - public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks) + public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 9e9c181c1f6..b8fe2c1a720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -111,7 +111,7 @@ public Boolean get() { * Wait for the NameNode to issue any deletions that are already * pending (i.e. for the pendingDeletionBlocksCount to go to 0) */ - static void waitForNNToIssueDeletions(final NameNode nn) + public static void waitForNNToIssueDeletions(final NameNode nn) throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 65ade2c57a3..74a38756cc5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -176,6 +176,12 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle is on in the shuffle-handler. (Jian He via vinodkv) + MAPREDUCE-5754. Preserve Job diagnostics in history (Gera Shegalov via + jlowe) + + MAPREDUCE-5766. Moved ping messages from TaskAttempts to be at DEBUG level + inside the ApplicationMaster log. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 03b6e52db8e..074e3f0fbb8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -360,12 +360,13 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID, if (taskStatus == null) { //We are using statusUpdate only as a simple ping - LOG.info("Ping from " + taskAttemptID.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Ping from " + taskAttemptID.toString()); + } return feedback; } // if we are here there is an actual status update to be processed - LOG.info("Status update from " + taskAttemptID.toString()); taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = @@ -453,7 +454,7 @@ public JvmTask getTask(JvmContext context) throws IOException { JVMId jvmId = context.jvmId; LOG.info("JVM with ID : " + jvmId + " asked for a task"); - + JvmTask jvmTask = null; // TODO: Is it an authorized container to get a task? Otherwise return null. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 3de5ba7e768..7ab02287b10 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; @@ -343,11 +344,12 @@ protected void serviceStop() throws Exception { LOG.warn("Found jobId " + toClose + " to have not been closed. Will close"); //Create a JobFinishEvent so that it is written to the job history + final Job job = context.getJob(toClose); JobUnsuccessfulCompletionEvent jucEvent = new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), - System.currentTimeMillis(), context.getJob(toClose) - .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(), - JobState.KILLED.toString()); + System.currentTimeMillis(), job.getCompletedMaps(), + job.getCompletedReduces(), JobState.KILLED.toString(), + job.getDiagnostics()); JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent); //Bypass the queue mechanism which might wait. Call the method directly handleEvent(jfEvent); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 88dc99eb365..36bfca71834 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -149,6 +149,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // Maximum no. of fetch-failure notifications after which map task is failed private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; + + public static final String JOB_KILLED_DIAG = + "Job received Kill while in RUNNING state."; //final fields private final ApplicationAttemptId applicationAttemptId; @@ -1617,7 +1620,8 @@ private void unsuccessfulFinish(JobStateInternal finalState) { finishTime, succeededMapTaskCount, succeededReduceTaskCount, - finalState.toString()); + finalState.toString(), + diagnostics); eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent)); finished(finalState); @@ -1730,7 +1734,7 @@ public void transition(JobImpl job, JobEvent event) { JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - JobStateInternal.KILLED.toString()); + JobStateInternal.KILLED.toString(), job.diagnostics); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.finished(JobStateInternal.KILLED); } @@ -1763,7 +1767,7 @@ private static class KillTasksTransition implements SingleArcTransition { @Override public void transition(JobImpl job, JobEvent event) { - job.addDiagnostic("Job received Kill while in RUNNING state."); + job.addDiagnostic(JOB_KILLED_DIAG); for (Task task : job.tasks.values()) { job.eventHandler.handle( new TaskEvent(task.getID(), TaskEventType.T_KILL)); @@ -2127,7 +2131,7 @@ public void transition(JobImpl job, JobEvent event) { JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - jobHistoryString); + jobHistoryString, job.diagnostics); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.finished(terminationState); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index d383f2912f9..59ba5b01810 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.junit.Test; public class TestEvents { @@ -334,11 +335,12 @@ private FakeEvent getSetupAttemptStartedEvent() { private FakeEvent getJobKilledEvent() { FakeEvent result = new FakeEvent(EventType.JOB_KILLED); JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion(); - datum.finishedMaps = 1; - datum.finishedReduces = 2; - datum.finishTime = 3; - datum.jobid = "ID"; - datum.jobStatus = "STATUS"; + datum.setFinishedMaps(1); + datum.setFinishedReduces(2); + datum.setFinishTime(3L); + datum.setJobid("ID"); + datum.setJobStatus("STATUS"); + datum.setDiagnostics(JobImpl.JOB_KILLED_DIAG); result.setDatum(datum); return result; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index fa194338cf8..2d924d522b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -135,7 +135,8 @@ {"name": "finishTime", "type": "long"}, {"name": "finishedMaps", "type": "int"}, {"name": "finishedReduces", "type": "int"}, - {"name": "jobStatus", "type": "string"} + {"name": "jobStatus", "type": "string"}, + {"name": "diagnostics", "type": "string"} ] }, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 19e2a51a132..295de6373dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -353,10 +353,6 @@ private void handleTaskFailedEvent(TaskFailedEvent event) { taskInfo.error = StringInterner.weakIntern(event.getError()); taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); taskInfo.counters = event.getCounters(); - if (info.errorInfo.isEmpty()) { - info.errorInfo = "Task " + taskInfo.taskId + " failed " + - taskInfo.attemptsMap.size() + " times "; - } } private void handleTaskStartedEvent(TaskStartedEvent event) { @@ -373,6 +369,7 @@ private void handleJobFailedEvent(JobUnsuccessfulCompletionEvent event) { info.finishedMaps = event.getFinishedMaps(); info.finishedReduces = event.getFinishedReduces(); info.jobStatus = StringInterner.weakIntern(event.getStatus()); + info.errorInfo = StringInterner.weakIntern(event.getDiagnostics()); } private void handleJobFinishedEvent(JobFinishedEvent event) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java index 3adb91f2a8d..2d6a68e1d41 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java @@ -18,11 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; +import com.google.common.base.Joiner; + import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; +import java.util.Collections; + /** * Event to record Failed and Killed completion of jobs * @@ -30,6 +34,10 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class JobUnsuccessfulCompletionEvent implements HistoryEvent { + private static final String NODIAGS = ""; + private static final Iterable NODIAGS_LIST = + Collections.singletonList(NODIAGS); + private JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion(); @@ -44,11 +52,33 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent { public JobUnsuccessfulCompletionEvent(JobID id, long finishTime, int finishedMaps, int finishedReduces, String status) { - datum.jobid = new Utf8(id.toString()); - datum.finishTime = finishTime; - datum.finishedMaps = finishedMaps; - datum.finishedReduces = finishedReduces; - datum.jobStatus = new Utf8(status); + this(id, finishTime, finishedMaps, finishedReduces, status, NODIAGS_LIST); + } + + /** + * Create an event to record unsuccessful completion (killed/failed) of jobs + * @param id Job ID + * @param finishTime Finish time of the job + * @param finishedMaps Number of finished maps + * @param finishedReduces Number of finished reduces + * @param status Status of the job + * @param diagnostics job runtime diagnostics + */ + public JobUnsuccessfulCompletionEvent(JobID id, long finishTime, + int finishedMaps, + int finishedReduces, + String status, + Iterable diagnostics) { + datum.setJobid(new Utf8(id.toString())); + datum.setFinishTime(finishTime); + datum.setFinishedMaps(finishedMaps); + datum.setFinishedReduces(finishedReduces); + datum.setJobStatus(new Utf8(status)); + if (diagnostics == null) { + diagnostics = NODIAGS_LIST; + } + datum.setDiagnostics(new Utf8(Joiner.on('\n').skipNulls() + .join(diagnostics))); } JobUnsuccessfulCompletionEvent() {} @@ -61,13 +91,13 @@ public void setDatum(Object datum) { /** Get the Job ID */ public JobID getJobId() { return JobID.forName(datum.jobid.toString()); } /** Get the job finish time */ - public long getFinishTime() { return datum.finishTime; } + public long getFinishTime() { return datum.getFinishTime(); } /** Get the number of finished maps */ - public int getFinishedMaps() { return datum.finishedMaps; } + public int getFinishedMaps() { return datum.getFinishedMaps(); } /** Get the number of finished reduces */ - public int getFinishedReduces() { return datum.finishedReduces; } + public int getFinishedReduces() { return datum.getFinishedReduces(); } /** Get the status */ - public String getStatus() { return datum.jobStatus.toString(); } + public String getStatus() { return datum.getJobStatus().toString(); } /** Get the event type */ public EventType getEventType() { if ("FAILED".equals(getStatus())) { @@ -78,4 +108,13 @@ public EventType getEventType() { return EventType.JOB_KILLED; } + /** + * Retrieves diagnostics information preserved in the history file + * + * @return diagnostics as of the time of job termination + */ + public String getDiagnostics() { + final CharSequence diagnostics = datum.getDiagnostics(); + return diagnostics == null ? NODIAGS : diagnostics.toString(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml index 3ac191fb86b..ad887bba63a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml @@ -73,7 +73,7 @@ src/test/resources/job_1329348432655_0001_conf.xml - src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist + src/test/resources/*.jhist diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 3d704ef0530..8c0c4963f6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -18,6 +18,10 @@ package org.apache.hadoop.mapreduce.v2.hs; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic + .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -25,6 +29,7 @@ import java.io.IOException; import java.io.PrintStream; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,9 +41,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; @@ -53,6 +58,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; @@ -66,8 +72,11 @@ import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl; import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; @@ -149,7 +158,7 @@ private void checkHistoryParsing(final int numMaps, final int numReduces, conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); long amStartTimeEst = System.currentTimeMillis(); conf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass() @@ -390,7 +399,7 @@ public void testHistoryParsingForFailedAttempts() throws Exception { try { Configuration conf = new Configuration(); conf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this @@ -455,7 +464,7 @@ public void testCountersForFailedTask() throws Exception { try { Configuration conf = new Configuration(); conf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this @@ -499,18 +508,85 @@ public void testCountersForFailedTask() throws Exception { Assert.assertNotNull("completed task report has null counters", ct .getReport().getCounters()); } + final List originalDiagnostics = job.getDiagnostics(); + final String historyError = jobInfo.getErrorInfo(); + assertTrue("No original diagnostics for a failed job", + originalDiagnostics != null && !originalDiagnostics.isEmpty()); + assertNotNull("No history error info for a failed job ", historyError); + for (String diagString : originalDiagnostics) { + assertTrue(historyError.contains(diagString)); + } } finally { LOG.info("FINISHED testCountersForFailedTask"); } } + @Test(timeout = 60000) + public void testDiagnosticsForKilledJob() throws Exception { + LOG.info("STARTING testDiagnosticsForKilledJob"); + try { + final Configuration conf = new Configuration(); + conf.setClass( + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + MyResolver.class, DNSToSwitchMapping.class); + RackResolver.init(conf); + MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this + .getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + app.waitForState(job, JobState.KILLED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); + + HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); + + JobHistoryParser parser; + JobInfo jobInfo; + synchronized (fileInfo) { + Path historyFilePath = fileInfo.getHistoryFile(); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } + + parser = new JobHistoryParser(in); + jobInfo = parser.parse(); + } + Exception parseException = parser.getParseException(); + assertNull("Caught an expected exception " + parseException, + parseException); + final List originalDiagnostics = job.getDiagnostics(); + final String historyError = jobInfo.getErrorInfo(); + assertTrue("No original diagnostics for a failed job", + originalDiagnostics != null && !originalDiagnostics.isEmpty()); + assertNotNull("No history error info for a failed job ", historyError); + for (String diagString : originalDiagnostics) { + assertTrue(historyError.contains(diagString)); + } + assertTrue("No killed message in diagnostics", + historyError.contains(JobImpl.JOB_KILLED_DIAG)); + } finally { + LOG.info("FINISHED testDiagnosticsForKilledJob"); + } + } + @Test(timeout = 50000) public void testScanningOldDirs() throws Exception { LOG.info("STARTING testScanningOldDirs"); try { Configuration conf = new Configuration(); conf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), @@ -590,6 +666,27 @@ protected void attemptLaunched(TaskAttemptId attemptID) { } } + static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory { + + public MRAppWithHistoryWithJobKilled(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + } + + @SuppressWarnings("unchecked") + @Override + protected void attemptLaunched(TaskAttemptId attemptID) { + if (attemptID.getTaskId().getId() == 0) { + getContext().getEventHandler().handle( + new JobEvent(attemptID.getTaskId().getJobId(), + JobEventType.JOB_KILL)); + } else { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); + } + } + } + static class HistoryFileManagerForTest extends HistoryFileManager { void deleteJobFromJobListCache(HistoryFileInfo fileInfo) { jobListCache.delete(fileInfo); @@ -613,7 +710,7 @@ public void testDeleteFileInfo() throws Exception { Configuration conf = new Configuration(); conf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); @@ -668,7 +765,7 @@ public void testJobHistoryMethods() throws Exception { Configuration configuration = new Configuration(); configuration .setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(configuration); @@ -743,7 +840,7 @@ public void testMultipleFailedTasks() throws Exception { final org.apache.hadoop.mapreduce.TaskType taskType = org.apache.hadoop.mapreduce.TaskType.MAP; final TaskID[] tids = new TaskID[2]; - JobID jid = new JobID("1", 1); + final JobID jid = new JobID("1", 1); tids[0] = new TaskID(jid, taskType, 0); tids[1] = new TaskID(jid, taskType, 1); Mockito.when(reader.getNextEvent()).thenAnswer( @@ -762,6 +859,13 @@ public HistoryEvent answer(InvocationOnMock invocation) tfe.setDatum(tfe.getDatum()); return tfe; } + if (eventId < 5) { + JobUnsuccessfulCompletionEvent juce = + new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0, + "JOB_FAILED", Collections.singletonList( + "Task failed: " + tids[0].toString())); + return juce; + } return null; } }); @@ -769,4 +873,22 @@ public HistoryEvent answer(InvocationOnMock invocation) assertTrue("Task 0 not implicated", info.getErrorInfo().contains(tids[0].toString())); } + + @Test + public void testFailedJobHistoryWithoutDiagnostics() throws Exception { + final Path histPath = new Path(getClass().getClassLoader().getResource( + "job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist") + .getFile()); + final FileSystem lfs = FileSystem.getLocal(new Configuration()); + final FSDataInputStream fsdis = lfs.open(histPath); + try { + JobHistoryParser parser = new JobHistoryParser(fsdis); + JobInfo info = parser.parse(); + assertEquals("History parsed jobId incorrectly", + info.getJobId(), JobID.forName("job_1393307629410_0001") ); + assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo()); + } finally { + fsdis.close(); + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist new file mode 100644 index 00000000000..fafe451443f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist @@ -0,0 +1,19 @@ +Avro-Json +{"type":"record","name":"Event","namespace":"org.apache.hadoop.mapreduce.jobhistory","fields":[{"name":"type","type":{"type":"enum","name":"EventType","symbols":["JOB_SUBMITTED","JOB_INITED","JOB_FINISHED","JOB_PRIORITY_CHANGED","JOB_STATUS_CHANGED","JOB_FAILED","JOB_KILLED","JOB_ERROR","JOB_INFO_CHANGED","TASK_STARTED","TASK_FINISHED","TASK_FAILED","TASK_UPDATED","NORMALIZED_RESOURCE","MAP_ATTEMPT_STARTED","MAP_ATTEMPT_FINISHED","MAP_ATTEMPT_FAILED","MAP_ATTEMPT_KILLED","REDUCE_ATTEMPT_STARTED","REDUCE_ATTEMPT_FINISHED","REDUCE_ATTEMPT_FAILED","REDUCE_ATTEMPT_KILLED","SETUP_ATTEMPT_STARTED","SETUP_ATTEMPT_FINISHED","SETUP_ATTEMPT_FAILED","SETUP_ATTEMPT_KILLED","CLEANUP_ATTEMPT_STARTED","CLEANUP_ATTEMPT_FINISHED","CLEANUP_ATTEMPT_FAILED","CLEANUP_ATTEMPT_KILLED","AM_STARTED"]}},{"name":"event","type":[{"type":"record","name":"JobFinished","fields":[{"name":"jobid","type":"string"},{"name":"finishTime","type":"long"},{"name":"finishedMaps","type":"int"},{"name":"finishedReduces","type":"int"},{"name":"failedMaps","type":"int"},{"name":"failedReduces","type":"int"},{"name":"totalCounters","type":{"type":"record","name":"JhCounters","fields":[{"name":"name","type":"string"},{"name":"groups","type":{"type":"array","items":{"type":"record","name":"JhCounterGroup","fields":[{"name":"name","type":"string"},{"name":"displayName","type":"string"},{"name":"counts","type":{"type":"array","items":{"type":"record","name":"JhCounter","fields":[{"name":"name","type":"string"},{"name":"displayName","type":"string"},{"name":"value","type":"long"}]}}}]}}}]}},{"name":"mapCounters","type":"JhCounters"},{"name":"reduceCounters","type":"JhCounters"}]},{"type":"record","name":"JobInfoChange","fields":[{"name":"jobid","type":"string"},{"name":"submitTime","type":"long"},{"name":"launchTime","type":"long"}]},{"type":"record","name":"JobInited","fields":[{"name":"jobid","type":"string"},{"name":"launchTime","type":"long"},{"name":"totalMaps","type":"int"},{"name":"totalReduces","type":"int"},{"name":"jobStatus","type":"string"},{"name":"uberized","type":"boolean"}]},{"type":"record","name":"AMStarted","fields":[{"name":"applicationAttemptId","type":"string"},{"name":"startTime","type":"long"},{"name":"containerId","type":"string"},{"name":"nodeManagerHost","type":"string"},{"name":"nodeManagerPort","type":"int"},{"name":"nodeManagerHttpPort","type":"int"}]},{"type":"record","name":"JobPriorityChange","fields":[{"name":"jobid","type":"string"},{"name":"priority","type":"string"}]},{"type":"record","name":"JobStatusChanged","fields":[{"name":"jobid","type":"string"},{"name":"jobStatus","type":"string"}]},{"type":"record","name":"JobSubmitted","fields":[{"name":"jobid","type":"string"},{"name":"jobName","type":"string"},{"name":"userName","type":"string"},{"name":"submitTime","type":"long"},{"name":"jobConfPath","type":"string"},{"name":"acls","type":{"type":"map","values":"string"}},{"name":"jobQueueName","type":"string"},{"name":"workflowId","type":"string"},{"name":"workflowName","type":"string"},{"name":"workflowNodeName","type":"string"},{"name":"workflowAdjacencies","type":"string"},{"name":"workflowTags","type":"string"}]},{"type":"record","name":"JobUnsuccessfulCompletion","fields":[{"name":"jobid","type":"string"},{"name":"finishTime","type":"long"},{"name":"finishedMaps","type":"int"},{"name":"finishedReduces","type":"int"},{"name":"jobStatus","type":"string"}]},{"type":"record","name":"MapAttemptFinished","fields":[{"name":"taskid","type":"string"},{"name":"attemptId","type":"string"},{"name":"taskType","type":"string"},{"name":"taskStatus","type":"string"},{"name":"mapFinishTime","type":"long"},{"name":"finishTime","type":"long"},{"name":"hostname","type":"string"},{"name":"port","type":"int"},{"name":"rackname","type":"string"},{"name":"state","type":"string"},{"name":"counters","type":"JhCounters"},{"name":"clockSplits","type":{"type":"array","items":"int"}},{"name":"cpuUsages","type":{"type":"array","items":"int"}},{"name":"vMemKbytes","type":{"type":"array","items":"int"}},{"name":"physMemKbytes","type":{"type":"array","items":"int"}}]},{"type":"record","name":"ReduceAttemptFinished","fields":[{"name":"taskid","type":"string"},{"name":"attemptId","type":"string"},{"name":"taskType","type":"string"},{"name":"taskStatus","type":"string"},{"name":"shuffleFinishTime","type":"long"},{"name":"sortFinishTime","type":"long"},{"name":"finishTime","type":"long"},{"name":"hostname","type":"string"},{"name":"port","type":"int"},{"name":"rackname","type":"string"},{"name":"state","type":"string"},{"name":"counters","type":"JhCounters"},{"name":"clockSplits","type":{"type":"array","items":"int"}},{"name":"cpuUsages","type":{"type":"array","items":"int"}},{"name":"vMemKbytes","type":{"type":"array","items":"int"}},{"name":"physMemKbytes","type":{"type":"array","items":"int"}}]},{"type":"record","name":"TaskAttemptFinished","fields":[{"name":"taskid","type":"string"},{"name":"attemptId","type":"string"},{"name":"taskType","type":"string"},{"name":"taskStatus","type":"string"},{"name":"finishTime","type":"long"},{"name":"rackname","type":"string"},{"name":"hostname","type":"string"},{"name":"state","type":"string"},{"name":"counters","type":"JhCounters"}]},{"type":"record","name":"TaskAttemptStarted","fields":[{"name":"taskid","type":"string"},{"name":"taskType","type":"string"},{"name":"attemptId","type":"string"},{"name":"startTime","type":"long"},{"name":"trackerName","type":"string"},{"name":"httpPort","type":"int"},{"name":"shufflePort","type":"int"},{"name":"containerId","type":"string"},{"name":"locality","type":"string"},{"name":"avataar","type":"string"}]},{"type":"record","name":"TaskAttemptUnsuccessfulCompletion","fields":[{"name":"taskid","type":"string"},{"name":"taskType","type":"string"},{"name":"attemptId","type":"string"},{"name":"finishTime","type":"long"},{"name":"hostname","type":"string"},{"name":"port","type":"int"},{"name":"rackname","type":"string"},{"name":"status","type":"string"},{"name":"error","type":"string"},{"name":"counters","type":"JhCounters"},{"name":"clockSplits","type":{"type":"array","items":"int"}},{"name":"cpuUsages","type":{"type":"array","items":"int"}},{"name":"vMemKbytes","type":{"type":"array","items":"int"}},{"name":"physMemKbytes","type":{"type":"array","items":"int"}}]},{"type":"record","name":"TaskFailed","fields":[{"name":"taskid","type":"string"},{"name":"taskType","type":"string"},{"name":"finishTime","type":"long"},{"name":"error","type":"string"},{"name":"failedDueToAttempt","type":["null","string"]},{"name":"status","type":"string"},{"name":"counters","type":"JhCounters"}]},{"type":"record","name":"TaskFinished","fields":[{"name":"taskid","type":"string"},{"name":"taskType","type":"string"},{"name":"finishTime","type":"long"},{"name":"status","type":"string"},{"name":"counters","type":"JhCounters"},{"name":"successfulAttemptId","type":"string"}]},{"type":"record","name":"TaskStarted","fields":[{"name":"taskid","type":"string"},{"name":"taskType","type":"string"},{"name":"startTime","type":"long"},{"name":"splitLocations","type":"string"}]},{"type":"record","name":"TaskUpdated","fields":[{"name":"taskid","type":"string"},{"name":"finishTime","type":"long"}]}]}]} +{"type":"AM_STARTED","event":{"org.apache.hadoop.mapreduce.jobhistory.AMStarted":{"applicationAttemptId":"appattempt_1393307629410_0001_000001","startTime":1393307691014,"containerId":"container_1393307629410_0001_01_000001","nodeManagerHost":"172.25.142.198","nodeManagerPort":57763,"nodeManagerHttpPort":8042}}} + +{"type":"JOB_SUBMITTED","event":{"org.apache.hadoop.mapreduce.jobhistory.JobSubmitted":{"jobid":"job_1393307629410_0001","jobName":"Sleep job","userName":"user","submitTime":1393307687476,"jobConfPath":"hdfs://localhost:9000/tmp/hadoop-yarn/staging/user/.staging/job_1393307629410_0001/job.xml","acls":{},"jobQueueName":"default","workflowId":"","workflowName":"","workflowNodeName":"","workflowAdjacencies":"","workflowTags":""}}} + +{"type":"JOB_INITED","event":{"org.apache.hadoop.mapreduce.jobhistory.JobInited":{"jobid":"job_1393307629410_0001","launchTime":1393307693920,"totalMaps":1,"totalReduces":0,"jobStatus":"INITED","uberized":false}}} + +{"type":"JOB_INFO_CHANGED","event":{"org.apache.hadoop.mapreduce.jobhistory.JobInfoChange":{"jobid":"job_1393307629410_0001","submitTime":1393307687476,"launchTime":1393307693920}}} + +{"type":"TASK_STARTED","event":{"org.apache.hadoop.mapreduce.jobhistory.TaskStarted":{"taskid":"task_1393307629410_0001_m_000000","taskType":"MAP","startTime":1393307693956,"splitLocations":""}}} + +{"type":"MAP_ATTEMPT_STARTED","event":{"org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStarted":{"taskid":"task_1393307629410_0001_m_000000","taskType":"MAP","attemptId":"attempt_1393307629410_0001_m_000000_0","startTime":1393307696163,"trackerName":"172.25.142.198","httpPort":8042,"shufflePort":13562,"containerId":"container_1393307629410_0001_01_000002","locality":"OFF_SWITCH","avataar":"VIRGIN"}}} + +{"type":"MAP_ATTEMPT_FAILED","event":{"org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion":{"taskid":"task_1393307629410_0001_m_000000","taskType":"MAP","attemptId":"attempt_1393307629410_0001_m_000000_0","finishTime":1393307723803,"hostname":"172.25.142.198","port":57763,"rackname":"/default-rack","status":"FAILED","error":"AttemptID:attempt_1393307629410_0001_m_000000_0 Timed out after 5 secs","counters":{"name":"COUNTERS","groups":[{"name":"org.apache.hadoop.mapreduce.FileSystemCounter","displayName":"File System Counters","counts":[{"name":"FILE_BYTES_READ","displayName":"FILE: Number of bytes read","value":0},{"name":"FILE_BYTES_WRITTEN","displayName":"FILE: Number of bytes written","value":86662},{"name":"FILE_READ_OPS","displayName":"FILE: Number of read operations","value":0},{"name":"FILE_LARGE_READ_OPS","displayName":"FILE: Number of large read operations","value":0},{"name":"FILE_WRITE_OPS","displayName":"FILE: Number of write operations","value":0},{"name":"HDFS_BYTES_READ","displayName":"HDFS: Number of bytes read","value":48},{"name":"HDFS_BYTES_WRITTEN","displayName":"HDFS: Number of bytes written","value":0},{"name":"HDFS_READ_OPS","displayName":"HDFS: Number of read operations","value":1},{"name":"HDFS_LARGE_READ_OPS","displayName":"HDFS: Number of large read operations","value":0},{"name":"HDFS_WRITE_OPS","displayName":"HDFS: Number of write operations","value":0}]},{"name":"org.apache.hadoop.mapreduce.TaskCounter","displayName":"Map-Reduce Framework","counts":[{"name":"MAP_INPUT_RECORDS","displayName":"Map input records","value":1},{"name":"MAP_OUTPUT_RECORDS","displayName":"Map output records","value":0},{"name":"SPLIT_RAW_BYTES","displayName":"Input split bytes","value":48},{"name":"SPILLED_RECORDS","displayName":"Spilled Records","value":0},{"name":"FAILED_SHUFFLE","displayName":"Failed Shuffles","value":0},{"name":"MERGED_MAP_OUTPUTS","displayName":"Merged Map outputs","value":0},{"name":"GC_TIME_MILLIS","displayName":"GC time elapsed (ms)","value":13},{"name":"CPU_MILLISECONDS","displayName":"CPU time spent (ms)","value":0},{"name":"PHYSICAL_MEMORY_BYTES","displayName":"Physical memory (bytes) snapshot","value":0},{"name":"VIRTUAL_MEMORY_BYTES","displayName":"Virtual memory (bytes) snapshot","value":0},{"name":"COMMITTED_HEAP_BYTES","displayName":"Total committed heap usage (bytes)","value":85000192}]},{"name":"org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter","displayName":"File Input Format Counters ","counts":[{"name":"BYTES_READ","displayName":"Bytes Read","value":0}]},{"name":"org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter","displayName":"File Output Format Counters ","counts":[{"name":"BYTES_WRITTEN","displayName":"Bytes Written","value":0}]}]},"clockSplits":[747,747,748,747,747,748,747,747,748,747,747,748],"cpuUsages":[0,0,0,0,0,0,0,0,0,0,0,0],"vMemKbytes":[0,0,0,0,0,0,0,0,0,0,0,0],"physMemKbytes":[0,0,0,0,0,0,0,0,0,0,0,0]}}} + +{"type":"TASK_FAILED","event":{"org.apache.hadoop.mapreduce.jobhistory.TaskFailed":{"taskid":"task_1393307629410_0001_m_000000","taskType":"MAP","finishTime":1393307723803,"error":", AttemptID:attempt_1393307629410_0001_m_000000_0 Timed out after 5 secs","failedDueToAttempt":{"string":"attempt_1393307629410_0001_m_000000_0"},"status":"FAILED","counters":{"name":"COUNTERS","groups":[]}}}} + +{"type":"JOB_FAILED","event":{"org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletion":{"jobid":"job_1393307629410_0001","finishTime":1393307723835,"finishedMaps":0,"finishedReduces":0,"jobStatus":"FAILED"}}} diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e75fbba9b48..948fb6cd90d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -32,6 +32,9 @@ Release 2.5.0 - UNRELEASED YARN-1561. Fix a generic type warning in FairScheduler. (Chen He via junping_du) + YARN-1429. *nix: Allow a way for users to augment classpath of YARN daemons. + (Jarek Jarcec Cecho via kasha) + OPTIMIZATIONS BUG FIXES @@ -235,6 +238,9 @@ Release 2.4.0 - UNRELEASED YARN-1749. Updated application-history related configs to reflect the latest reality and to be consistently named. (Zhijie Shen via vinodkv) + YARN-1301. Added the INFO level log of the non-empty blacklist additions + and removals inside ApplicationMasterService. (Tsuyoshi Ozawa via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn index 7b805ffe86f..68adb43d2ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -22,7 +22,12 @@ # # JAVA_HOME The java implementation to use. Overrides JAVA_HOME. # -# YARN_CLASSPATH Extra Java CLASSPATH entries. +# YARN_USER_CLASSPATH Additional user CLASSPATH entries. +# +# YARN_USER_CLASSPATH_FIRST If set to non empty value then the user classpath +# specified in YARN_USER_CLASSPATH will be +# appended at the beginning of YARN's final +# classpath instead of at the end. # # YARN_HEAPSIZE The maximum amount of heap to use, in MB. # Default is 1000. @@ -163,6 +168,17 @@ fi CLASSPATH=${CLASSPATH}:$HADOOP_YARN_HOME/${YARN_DIR}/* CLASSPATH=${CLASSPATH}:$HADOOP_YARN_HOME/${YARN_LIB_JARS_DIR}/* +# Add user defined YARN_USER_CLASSPATH to the class path (if defined) +if [ -n "$YARN_USER_CLASSPATH" ]; then + if [ -n "$YARN_USER_CLASSPATH_FIRST" ]; then + # User requested to add the custom entries at the beginning + CLASSPATH=${YARN_USER_CLASSPATH}:${CLASSPATH} + else + # By default we will just append the extra entries at the end + CLASSPATH=${CLASSPATH}:${YARN_USER_CLASSPATH} + fi +fi + # so that filenames w/ spaces are handled correctly in loops below IFS= @@ -249,4 +265,3 @@ if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then fi exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@" -fi diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index b5f90c6f38a..70fd69fdad3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -447,10 +448,10 @@ public AllocateResponse allocate(AllocateRequest request) request.getResourceBlacklistRequest(); List blacklistAdditions = (blacklistRequest != null) ? - blacklistRequest.getBlacklistAdditions() : null; + blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; List blacklistRemovals = (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : null; + blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; // sanity check try { @@ -487,6 +488,11 @@ public AllocateResponse allocate(AllocateRequest request) this.rScheduler.allocate(appAttemptId, ask, release, blacklistAdditions, blacklistRemovals); + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { + LOG.info("blacklist are updated in Scheduler." + + "blacklistAdditions: " + blacklistAdditions + ", " + + "blacklistRemovals: " + blacklistRemovals); + } RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);