Merge r1569890 through r1572417 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1572418 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-02-27 03:11:07 +00:00
commit 58bccdbc04
37 changed files with 575 additions and 305 deletions

View File

@ -362,6 +362,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6018. Exception recorded in LOG when IPCLoggerChannel#close is called. HDFS-6018. Exception recorded in LOG when IPCLoggerChannel#close is called.
(jing9) (jing9)
HDFS-3969. Small bug fixes and improvements for disk locations API.
(Todd Lipcon and Andrew Wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -17,18 +17,18 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
/** /**
* HDFS-specific volume identifier which implements {@link VolumeId}. Can be * HDFS-specific volume identifier which implements {@link VolumeId}. Can be
* used to differentiate between the data directories on a single datanode. This * used to differentiate between the data directories on a single datanode. This
* identifier is only unique on a per-datanode basis. * identifier is only unique on a per-datanode basis.
*
* Note that invalid IDs are represented by {@link VolumeId#INVALID_VOLUME_ID}.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.Public @InterfaceAudience.Public
@ -37,28 +37,15 @@ public class HdfsVolumeId implements VolumeId {
private final byte[] id; private final byte[] id;
public HdfsVolumeId(byte[] id) { public HdfsVolumeId(byte[] id) {
if (id == null) { Preconditions.checkNotNull(id, "id cannot be null");
throw new NullPointerException("A valid Id can only be constructed " +
"with a non-null byte array.");
}
this.id = id; this.id = id;
} }
@Override
public final boolean isValid() {
return true;
}
@Override @Override
public int compareTo(VolumeId arg0) { public int compareTo(VolumeId arg0) {
if (arg0 == null) { if (arg0 == null) {
return 1; return 1;
} }
if (!arg0.isValid()) {
// any valid ID is greater
// than any invalid ID:
return 1;
}
return hashCode() - arg0.hashCode(); return hashCode() - arg0.hashCode();
} }
@ -76,14 +63,11 @@ public boolean equals(Object obj) {
return true; return true;
} }
HdfsVolumeId that = (HdfsVolumeId) obj; HdfsVolumeId that = (HdfsVolumeId) obj;
// NB: if (!obj.isValid()) { return false; } check is not necessary
// because we have class identity checking above, and for this class
// isValid() is always true.
return new EqualsBuilder().append(this.id, that.id).isEquals(); return new EqualsBuilder().append(this.id, that.id).isEquals();
} }
@Override @Override
public String toString() { public String toString() {
return Base64.encodeBase64String(id); return StringUtils.byteToHexString(id);
} }
} }

View File

@ -28,57 +28,6 @@
@InterfaceAudience.Public @InterfaceAudience.Public
public interface VolumeId extends Comparable<VolumeId> { public interface VolumeId extends Comparable<VolumeId> {
/**
* Represents an invalid Volume ID (ID for unknown content).
*/
public static final VolumeId INVALID_VOLUME_ID = new VolumeId() {
@Override
public int compareTo(VolumeId arg0) {
// This object is equal only to itself;
// It is greater than null, and
// is always less than any other VolumeId:
if (arg0 == null) {
return 1;
}
if (arg0 == this) {
return 0;
} else {
return -1;
}
}
@Override
public boolean equals(Object obj) {
// this object is equal only to itself:
return (obj == this);
}
@Override
public int hashCode() {
return Integer.MIN_VALUE;
}
@Override
public boolean isValid() {
return false;
}
@Override
public String toString() {
return "Invalid VolumeId";
}
};
/**
* Indicates if the disk identifier is valid. Invalid identifiers indicate
* that the block was not present, or the location could otherwise not be
* determined.
*
* @return true if the disk identifier is valid
*/
public boolean isValid();
@Override @Override
abstract public int compareTo(VolumeId arg0); abstract public int compareTo(VolumeId arg0);

View File

@ -19,10 +19,8 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -43,7 +41,6 @@
import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -51,16 +48,20 @@
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
class BlockStorageLocationUtil { class BlockStorageLocationUtil {
private static final Log LOG = LogFactory static final Log LOG = LogFactory
.getLog(BlockStorageLocationUtil.class); .getLog(BlockStorageLocationUtil.class);
/** /**
* Create a list of {@link VolumeBlockLocationCallable} corresponding to a set * Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
* of datanodes and blocks. * of datanodes and blocks. The blocks must all correspond to the same
* block pool.
* *
* @param datanodeBlocks * @param datanodeBlocks
* Map of datanodes to block replicas at each datanode * Map of datanodes to block replicas at each datanode
@ -70,6 +71,11 @@ class BlockStorageLocationUtil {
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables( private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int timeout, boolean connectToDnViaHostname) { int timeout, boolean connectToDnViaHostname) {
if (datanodeBlocks.isEmpty()) {
return Lists.newArrayList();
}
// Construct the callables, one per datanode // Construct the callables, one per datanode
List<VolumeBlockLocationCallable> callables = List<VolumeBlockLocationCallable> callables =
new ArrayList<VolumeBlockLocationCallable>(); new ArrayList<VolumeBlockLocationCallable>();
@ -78,17 +84,32 @@ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallab
// Construct RPC parameters // Construct RPC parameters
DatanodeInfo datanode = entry.getKey(); DatanodeInfo datanode = entry.getKey();
List<LocatedBlock> locatedBlocks = entry.getValue(); List<LocatedBlock> locatedBlocks = entry.getValue();
List<ExtendedBlock> extendedBlocks = if (locatedBlocks.isEmpty()) {
new ArrayList<ExtendedBlock>(locatedBlocks.size()); continue;
}
// Ensure that the blocks all are from the same block pool.
String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
for (LocatedBlock lb : locatedBlocks) {
if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
throw new IllegalArgumentException(
"All blocks to be queried must be in the same block pool: " +
locatedBlocks.get(0).getBlock() + " and " + lb +
" are from different pools.");
}
}
long[] blockIds = new long[locatedBlocks.size()];
int i = 0;
List<Token<BlockTokenIdentifier>> dnTokens = List<Token<BlockTokenIdentifier>> dnTokens =
new ArrayList<Token<BlockTokenIdentifier>>( new ArrayList<Token<BlockTokenIdentifier>>(
locatedBlocks.size()); locatedBlocks.size());
for (LocatedBlock b : locatedBlocks) { for (LocatedBlock b : locatedBlocks) {
extendedBlocks.add(b.getBlock()); blockIds[i++] = b.getBlock().getBlockId();
dnTokens.add(b.getBlockToken()); dnTokens.add(b.getBlockToken());
} }
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable( VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
conf, datanode, extendedBlocks, dnTokens, timeout, conf, datanode, poolId, blockIds, dnTokens, timeout,
connectToDnViaHostname); connectToDnViaHostname);
callables.add(callable); callables.add(callable);
} }
@ -102,18 +123,17 @@ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallab
* *
* @param datanodeBlocks * @param datanodeBlocks
* Map of datanodes to the blocks present on the DN * Map of datanodes to the blocks present on the DN
* @return metadatas List of block metadata for each datanode, specifying * @return metadatas Map of datanodes to block metadata of the DN
* volume locations for each block
* @throws InvalidBlockTokenException * @throws InvalidBlockTokenException
* if client does not have read access on a requested block * if client does not have read access on a requested block
*/ */
static List<HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata( static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks, Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int poolsize, int timeout, boolean connectToDnViaHostname) int poolsize, int timeoutMs, boolean connectToDnViaHostname)
throws InvalidBlockTokenException { throws InvalidBlockTokenException {
List<VolumeBlockLocationCallable> callables = List<VolumeBlockLocationCallable> callables =
createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout, createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
connectToDnViaHostname); connectToDnViaHostname);
// Use a thread pool to execute the Callables in parallel // Use a thread pool to execute the Callables in parallel
@ -121,27 +141,24 @@ static List<HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
new ArrayList<Future<HdfsBlocksMetadata>>(); new ArrayList<Future<HdfsBlocksMetadata>>();
ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize); ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
try { try {
futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS); futures = executor.invokeAll(callables, timeoutMs,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Swallow the exception here, because we can return partial results // Swallow the exception here, because we can return partial results
} }
executor.shutdown(); executor.shutdown();
// Initialize metadatas list with nulls Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
// This is used to later indicate if we didn't get a response from a DN Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
List<HdfsBlocksMetadata> metadatas = new ArrayList<HdfsBlocksMetadata>();
for (int i = 0; i < futures.size(); i++) {
metadatas.add(null);
}
// Fill in metadatas with results from DN RPCs, where possible // Fill in metadatas with results from DN RPCs, where possible
for (int i = 0; i < futures.size(); i++) { for (int i = 0; i < futures.size(); i++) {
VolumeBlockLocationCallable callable = callables.get(i);
DatanodeInfo datanode = callable.getDatanodeInfo();
Future<HdfsBlocksMetadata> future = futures.get(i); Future<HdfsBlocksMetadata> future = futures.get(i);
try { try {
HdfsBlocksMetadata metadata = future.get(); HdfsBlocksMetadata metadata = future.get();
metadatas.set(i, metadata); metadatas.put(callable.getDatanodeInfo(), metadata);
} catch (ExecutionException e) { } catch (ExecutionException e) {
VolumeBlockLocationCallable callable = callables.get(i);
DatanodeInfo datanode = callable.getDatanodeInfo();
Throwable t = e.getCause(); Throwable t = e.getCause();
if (t instanceof InvalidBlockTokenException) { if (t instanceof InvalidBlockTokenException) {
LOG.warn("Invalid access token when trying to retrieve " LOG.warn("Invalid access token when trying to retrieve "
@ -153,8 +170,8 @@ else if (t instanceof UnsupportedOperationException) {
+ " required #getHdfsBlocksMetadata() API"); + " required #getHdfsBlocksMetadata() API");
throw (UnsupportedOperationException) t; throw (UnsupportedOperationException) t;
} else { } else {
LOG.info("Failed to connect to datanode " + LOG.info("Failed to query block locations on datanode " +
datanode.getIpcAddr(false)); datanode.getIpcAddr(false) + ": " + t);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Could not fetch information from datanode", t); LOG.debug("Could not fetch information from datanode", t);
@ -175,23 +192,21 @@ else if (t instanceof UnsupportedOperationException) {
* *
* @param blocks * @param blocks
* Original LocatedBlock array * Original LocatedBlock array
* @param datanodeBlocks
* Mapping from datanodes to the list of replicas on each datanode
* @param metadatas * @param metadatas
* VolumeId information for the replicas on each datanode * VolumeId information for the replicas on each datanode
* @return blockVolumeIds per-replica VolumeId information associated with the * @return blockVolumeIds per-replica VolumeId information associated with the
* parent LocatedBlock * parent LocatedBlock
*/ */
static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks( static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
List<LocatedBlock> blocks, Map<DatanodeInfo, List<LocatedBlock> blocks,
List<LocatedBlock>> datanodeBlocks, List<HdfsBlocksMetadata> metadatas) { Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
// Initialize mapping of ExtendedBlock to LocatedBlock. // Initialize mapping of ExtendedBlock to LocatedBlock.
// Used to associate results from DN RPCs to the parent LocatedBlock // Used to associate results from DN RPCs to the parent LocatedBlock
Map<ExtendedBlock, LocatedBlock> extBlockToLocBlock = Map<Long, LocatedBlock> blockIdToLocBlock =
new HashMap<ExtendedBlock, LocatedBlock>(); new HashMap<Long, LocatedBlock>();
for (LocatedBlock b : blocks) { for (LocatedBlock b : blocks) {
extBlockToLocBlock.put(b.getBlock(), b); blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
} }
// Initialize the mapping of blocks -> list of VolumeIds, one per replica // Initialize the mapping of blocks -> list of VolumeIds, one per replica
@ -200,9 +215,8 @@ static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
new HashMap<LocatedBlock, List<VolumeId>>(); new HashMap<LocatedBlock, List<VolumeId>>();
for (LocatedBlock b : blocks) { for (LocatedBlock b : blocks) {
ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length); ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
// Start off all IDs as invalid, fill it in later with results from RPCs
for (int i = 0; i < b.getLocations().length; i++) { for (int i = 0; i < b.getLocations().length; i++) {
l.add(VolumeId.INVALID_VOLUME_ID); l.add(null);
} }
blockVolumeIds.put(b, l); blockVolumeIds.put(b, l);
} }
@ -210,27 +224,28 @@ static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
// Iterate through the list of metadatas (one per datanode). // Iterate through the list of metadatas (one per datanode).
// For each metadata, if it's valid, insert its volume location information // For each metadata, if it's valid, insert its volume location information
// into the Map returned to the caller // into the Map returned to the caller
Iterator<HdfsBlocksMetadata> metadatasIter = metadatas.iterator(); for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
Iterator<DatanodeInfo> datanodeIter = datanodeBlocks.keySet().iterator(); DatanodeInfo datanode = entry.getKey();
while (metadatasIter.hasNext()) { HdfsBlocksMetadata metadata = entry.getValue();
HdfsBlocksMetadata metadata = metadatasIter.next();
DatanodeInfo datanode = datanodeIter.next();
// Check if metadata is valid // Check if metadata is valid
if (metadata == null) { if (metadata == null) {
continue; continue;
} }
ExtendedBlock[] metaBlocks = metadata.getBlocks(); long[] metaBlockIds = metadata.getBlockIds();
List<byte[]> metaVolumeIds = metadata.getVolumeIds(); List<byte[]> metaVolumeIds = metadata.getVolumeIds();
List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes(); List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
// Add VolumeId for each replica in the HdfsBlocksMetadata // Add VolumeId for each replica in the HdfsBlocksMetadata
for (int j = 0; j < metaBlocks.length; j++) { for (int j = 0; j < metaBlockIds.length; j++) {
int volumeIndex = metaVolumeIndexes.get(j); int volumeIndex = metaVolumeIndexes.get(j);
ExtendedBlock extBlock = metaBlocks[j]; long blockId = metaBlockIds[j];
// Skip if block wasn't found, or not a valid index into metaVolumeIds // Skip if block wasn't found, or not a valid index into metaVolumeIds
// Also skip if the DN responded with a block we didn't ask for // Also skip if the DN responded with a block we didn't ask for
if (volumeIndex == Integer.MAX_VALUE if (volumeIndex == Integer.MAX_VALUE
|| volumeIndex >= metaVolumeIds.size() || volumeIndex >= metaVolumeIds.size()
|| !extBlockToLocBlock.containsKey(extBlock)) { || !blockIdToLocBlock.containsKey(blockId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("No data for block " + blockId);
}
continue; continue;
} }
// Get the VolumeId by indexing into the list of VolumeIds // Get the VolumeId by indexing into the list of VolumeIds
@ -238,7 +253,7 @@ static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
byte[] volumeId = metaVolumeIds.get(volumeIndex); byte[] volumeId = metaVolumeIds.get(volumeIndex);
HdfsVolumeId id = new HdfsVolumeId(volumeId); HdfsVolumeId id = new HdfsVolumeId(volumeId);
// Find out which index we are in the LocatedBlock's replicas // Find out which index we are in the LocatedBlock's replicas
LocatedBlock locBlock = extBlockToLocBlock.get(extBlock); LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
DatanodeInfo[] dnInfos = locBlock.getLocations(); DatanodeInfo[] dnInfos = locBlock.getLocations();
int index = -1; int index = -1;
for (int k = 0; k < dnInfos.length; k++) { for (int k = 0; k < dnInfos.length; k++) {
@ -292,21 +307,23 @@ static BlockStorageLocation[] convertToVolumeBlockLocations(
private static class VolumeBlockLocationCallable implements private static class VolumeBlockLocationCallable implements
Callable<HdfsBlocksMetadata> { Callable<HdfsBlocksMetadata> {
private Configuration configuration; private final Configuration configuration;
private int timeout; private final int timeout;
private DatanodeInfo datanode; private final DatanodeInfo datanode;
private List<ExtendedBlock> extendedBlocks; private final String poolId;
private List<Token<BlockTokenIdentifier>> dnTokens; private final long[] blockIds;
private boolean connectToDnViaHostname; private final List<Token<BlockTokenIdentifier>> dnTokens;
private final boolean connectToDnViaHostname;
VolumeBlockLocationCallable(Configuration configuration, VolumeBlockLocationCallable(Configuration configuration,
DatanodeInfo datanode, List<ExtendedBlock> extendedBlocks, DatanodeInfo datanode, String poolId, long []blockIds,
List<Token<BlockTokenIdentifier>> dnTokens, int timeout, List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
boolean connectToDnViaHostname) { boolean connectToDnViaHostname) {
this.configuration = configuration; this.configuration = configuration;
this.timeout = timeout; this.timeout = timeout;
this.datanode = datanode; this.datanode = datanode;
this.extendedBlocks = extendedBlocks; this.poolId = poolId;
this.blockIds = blockIds;
this.dnTokens = dnTokens; this.dnTokens = dnTokens;
this.connectToDnViaHostname = connectToDnViaHostname; this.connectToDnViaHostname = connectToDnViaHostname;
} }
@ -323,7 +340,7 @@ public HdfsBlocksMetadata call() throws Exception {
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname); timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens); metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
} catch (IOException e) { } catch (IOException e) {
// Bubble this up to the caller, handle with the Future // Bubble this up to the caller, handle with the Future
throw e; throw e;

View File

@ -241,6 +241,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
*/ */
public static class Conf { public static class Conf {
final int hdfsTimeout; // timeout value for a DFS operation. final int hdfsTimeout; // timeout value for a DFS operation.
final int maxFailoverAttempts; final int maxFailoverAttempts;
final int maxRetryAttempts; final int maxRetryAttempts;
final int failoverSleepBaseMillis; final int failoverSleepBaseMillis;
@ -267,7 +268,7 @@ public static class Conf {
final boolean connectToDnViaHostname; final boolean connectToDnViaHostname;
final boolean getHdfsBlocksMetadataEnabled; final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads; final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout; final int getFileBlockStorageLocationsTimeoutMs;
final int retryTimesForGetLastBlockLength; final int retryTimesForGetLastBlockLength;
final int retryIntervalForGetLastBlockLength; final int retryIntervalForGetLastBlockLength;
final long datanodeRestartTimeout; final long datanodeRestartTimeout;
@ -290,7 +291,6 @@ public static class Conf {
public Conf(Configuration conf) { public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf); hdfsTimeout = Client.getTimeout(conf);
maxFailoverAttempts = conf.getInt( maxFailoverAttempts = conf.getInt(
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
@ -349,9 +349,9 @@ public Conf(Configuration conf) {
getFileBlockStorageLocationsNumThreads = conf.getInt( getFileBlockStorageLocationsNumThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS, DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT); DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
getFileBlockStorageLocationsTimeout = conf.getInt( getFileBlockStorageLocationsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT, DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT);
retryTimesForGetLastBlockLength = conf.getInt( retryTimesForGetLastBlockLength = conf.getInt(
DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH, DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH,
DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
@ -1209,16 +1209,20 @@ public BlockStorageLocation[] getBlockStorageLocations(
} }
// Make RPCs to the datanodes to get volume locations for its replicas // Make RPCs to the datanodes to get volume locations for its replicas
List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
.queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
getConf().getFileBlockStorageLocationsNumThreads, getConf().getFileBlockStorageLocationsNumThreads,
getConf().getFileBlockStorageLocationsTimeout, getConf().getFileBlockStorageLocationsTimeoutMs,
getConf().connectToDnViaHostname); getConf().connectToDnViaHostname);
if (LOG.isTraceEnabled()) {
LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
}
// Regroup the returned VolumeId metadata to again be grouped by // Regroup the returned VolumeId metadata to again be grouped by
// LocatedBlock rather than by datanode // LocatedBlock rather than by datanode
Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
.associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas); .associateVolumeIdsWithBlocks(blocks, metadatas);
// Combine original BlockLocations with new VolumeId information // Combine original BlockLocations with new VolumeId information
BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil

View File

@ -66,8 +66,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads"; public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10; public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout"; public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = "dfs.client.file-block-storage-locations.timeout.millis";
public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60; public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000;
public static final String DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.times.get-last-block-length"; public static final String DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.times.get-last-block-length";
public static final int DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT = 3; public static final int DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT = 3;
public static final String DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.interval-ms.get-last-block-length"; public static final String DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.interval-ms.get-last-block-length";

View File

@ -234,9 +234,9 @@ public BlockLocation[] next(final FileSystem fs, final Path p)
* The returned array of {@link BlockStorageLocation} augments * The returned array of {@link BlockStorageLocation} augments
* {@link BlockLocation} with a {@link VolumeId} per block replica. The * {@link BlockLocation} with a {@link VolumeId} per block replica. The
* VolumeId specifies the volume on the datanode on which the replica resides. * VolumeId specifies the volume on the datanode on which the replica resides.
* The VolumeId has to be checked via {@link VolumeId#isValid()} before being * The VolumeId associated with a replica may be null because volume
* used because volume information can be unavailable if the corresponding * information can be unavailable if the corresponding datanode is down or
* datanode is down or if the requested block is not found. * if the requested block is not found.
* *
* This API is unstable, and datanode-side support is disabled by default. It * This API is unstable, and datanode-side support is disabled by default. It
* can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to

View File

@ -137,7 +137,9 @@ private static void addDeprecatedKeys() {
new DeprecationDelta("dfs.federation.nameservices", new DeprecationDelta("dfs.federation.nameservices",
DFSConfigKeys.DFS_NAMESERVICES), DFSConfigKeys.DFS_NAMESERVICES),
new DeprecationDelta("dfs.federation.nameservice.id", new DeprecationDelta("dfs.federation.nameservice.id",
DFSConfigKeys.DFS_NAMESERVICE_ID) DFSConfigKeys.DFS_NAMESERVICE_ID),
new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS)
}); });
} }

View File

@ -113,7 +113,8 @@ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
* This is in the form of an opaque {@link VolumeId} for each configured * This is in the form of an opaque {@link VolumeId} for each configured
* data directory, which is not guaranteed to be the same across DN restarts. * data directory, which is not guaranteed to be the same across DN restarts.
* *
* @param blocks * @param blockPoolId the pool to query
* @param blockIds
* list of blocks on the local datanode * list of blocks on the local datanode
* @param tokens * @param tokens
* block access tokens corresponding to the requested blocks * block access tokens corresponding to the requested blocks
@ -122,8 +123,8 @@ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
* @throws IOException * @throws IOException
* if datanode is unreachable, or replica is not found on datanode * if datanode is unreachable, or replica is not found on datanode
*/ */
HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
List<Token<BlockTokenIdentifier>> tokens) throws IOException; long []blockIds, List<Token<BlockTokenIdentifier>> tokens) throws IOException;
/** /**
* Shuts down a datanode. * Shuts down a datanode.

View File

@ -22,6 +22,10 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
/** /**
* Augments an array of blocks on a datanode with additional information about * Augments an array of blocks on a datanode with additional information about
* where the block is stored. * where the block is stored.
@ -30,10 +34,13 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class HdfsBlocksMetadata { public class HdfsBlocksMetadata {
/** The block pool that was queried */
private final String blockPoolId;
/** /**
* List of blocks * List of blocks
*/ */
private final ExtendedBlock[] blocks; private final long[] blockIds;
/** /**
* List of volumes * List of volumes
@ -50,7 +57,7 @@ public class HdfsBlocksMetadata {
/** /**
* Constructs HdfsBlocksMetadata. * Constructs HdfsBlocksMetadata.
* *
* @param blocks * @param blockIds
* List of blocks described * List of blocks described
* @param volumeIds * @param volumeIds
* List of potential volume identifiers, specifying volumes where * List of potential volume identifiers, specifying volumes where
@ -58,9 +65,13 @@ public class HdfsBlocksMetadata {
* @param volumeIndexes * @param volumeIndexes
* Indexes into the list of volume identifiers, one per block * Indexes into the list of volume identifiers, one per block
*/ */
public HdfsBlocksMetadata(ExtendedBlock[] blocks, List<byte[]> volumeIds, public HdfsBlocksMetadata(String blockPoolId,
long[] blockIds, List<byte[]> volumeIds,
List<Integer> volumeIndexes) { List<Integer> volumeIndexes) {
this.blocks = blocks; Preconditions.checkArgument(blockIds.length == volumeIndexes.size(),
"Argument lengths should match");
this.blockPoolId = blockPoolId;
this.blockIds = blockIds;
this.volumeIds = volumeIds; this.volumeIds = volumeIds;
this.volumeIndexes = volumeIndexes; this.volumeIndexes = volumeIndexes;
} }
@ -70,8 +81,8 @@ public HdfsBlocksMetadata(ExtendedBlock[] blocks, List<byte[]> volumeIds,
* *
* @return array of blocks * @return array of blocks
*/ */
public ExtendedBlock[] getBlocks() { public long[] getBlockIds() {
return blocks; return blockIds;
} }
/** /**
@ -91,4 +102,10 @@ public List<byte[]> getVolumeIds() {
public List<Integer> getVolumeIndexes() { public List<Integer> getVolumeIndexes() {
return volumeIndexes; return volumeIndexes;
} }
@Override
public String toString() {
return "Metadata for " + blockIds.length + " blocks in " +
blockPoolId + ": " + Joiner.on(",").join(Longs.asList(blockIds));
}
} }

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -132,19 +133,17 @@ public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
throws ServiceException { throws ServiceException {
HdfsBlocksMetadata resp; HdfsBlocksMetadata resp;
try { try {
// Construct the Lists to make the actual call String poolId = request.getBlockPoolId();
List<ExtendedBlock> blocks =
new ArrayList<ExtendedBlock>(request.getBlocksCount());
for (ExtendedBlockProto b : request.getBlocksList()) {
blocks.add(PBHelper.convert(b));
}
List<Token<BlockTokenIdentifier>> tokens = List<Token<BlockTokenIdentifier>> tokens =
new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount()); new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
for (TokenProto b : request.getTokensList()) { for (TokenProto b : request.getTokensList()) {
tokens.add(PBHelper.convert(b)); tokens.add(PBHelper.convert(b));
} }
long[] blockIds = Longs.toArray(request.getBlockIdsList());
// Call the real implementation // Call the real implementation
resp = impl.getHdfsBlocksMetadata(blocks, tokens); resp = impl.getHdfsBlocksMetadata(poolId, blockIds, tokens);
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -61,6 +62,7 @@
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -223,23 +225,19 @@ public Object getUnderlyingProxyObject() {
} }
@Override @Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, public HdfsBlocksMetadata getHdfsBlocksMetadata(String blockPoolId,
long[] blockIds,
List<Token<BlockTokenIdentifier>> tokens) throws IOException { List<Token<BlockTokenIdentifier>> tokens) throws IOException {
// Convert to proto objects
List<ExtendedBlockProto> blocksProtos =
new ArrayList<ExtendedBlockProto>(blocks.size());
List<TokenProto> tokensProtos = List<TokenProto> tokensProtos =
new ArrayList<TokenProto>(tokens.size()); new ArrayList<TokenProto>(tokens.size());
for (ExtendedBlock b : blocks) {
blocksProtos.add(PBHelper.convert(b));
}
for (Token<BlockTokenIdentifier> t : tokens) { for (Token<BlockTokenIdentifier> t : tokens) {
tokensProtos.add(PBHelper.convert(t)); tokensProtos.add(PBHelper.convert(t));
} }
// Build the request // Build the request
GetHdfsBlockLocationsRequestProto request = GetHdfsBlockLocationsRequestProto request =
GetHdfsBlockLocationsRequestProto.newBuilder() GetHdfsBlockLocationsRequestProto.newBuilder()
.addAllBlocks(blocksProtos) .setBlockPoolId(blockPoolId)
.addAllBlockIds(Longs.asList(blockIds))
.addAllTokens(tokensProtos) .addAllTokens(tokensProtos)
.build(); .build();
// Send the RPC // Send the RPC
@ -258,7 +256,7 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
// Array of indexes into the list of volumes, one per block // Array of indexes into the list of volumes, one per block
List<Integer> volumeIndexes = response.getVolumeIndexesList(); List<Integer> volumeIndexes = response.getVolumeIndexesList();
// Parsed HdfsVolumeId values, one per block // Parsed HdfsVolumeId values, one per block
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), return new HdfsBlocksMetadata(blockPoolId, blockIds,
volumeIds, volumeIndexes); volumeIds, volumeIndexes);
} }

View File

@ -1136,22 +1136,23 @@ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
} }
@Override @Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks, public HdfsBlocksMetadata getHdfsBlocksMetadata(
String bpId, long[] blockIds,
List<Token<BlockTokenIdentifier>> tokens) throws IOException, List<Token<BlockTokenIdentifier>> tokens) throws IOException,
UnsupportedOperationException { UnsupportedOperationException {
if (!getHdfsBlockLocationsEnabled) { if (!getHdfsBlockLocationsEnabled) {
throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata " throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+ " is not enabled in datanode config"); + " is not enabled in datanode config");
} }
if (blocks.size() != tokens.size()) { if (blockIds.length != tokens.size()) {
throw new IOException("Differing number of blocks and tokens"); throw new IOException("Differing number of blocks and tokens");
} }
// Check access for each block // Check access for each block
for (int i = 0; i < blocks.size(); i++) { for (int i = 0; i < blockIds.length; i++) {
checkBlockToken(blocks.get(i), tokens.get(i), checkBlockToken(new ExtendedBlock(bpId, blockIds[i]),
BlockTokenSecretManager.AccessMode.READ); tokens.get(i), BlockTokenSecretManager.AccessMode.READ);
} }
return data.getHdfsBlocksMetadata(blocks); return data.getHdfsBlocksMetadata(bpId, blockIds);
} }
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token, private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,

View File

@ -405,12 +405,13 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
* Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in * Get a {@link HdfsBlocksMetadata} corresponding to the list of blocks in
* <code>blocks</code>. * <code>blocks</code>.
* *
* @param blocks List of blocks for which to return metadata * @param bpid pool to query
* @param blockIds List of block ids for which to return metadata
* @return metadata Metadata for the list of blocks * @return metadata Metadata for the list of blocks
* @throws IOException * @throws IOException
*/ */
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid,
throws IOException; long[] blockIds) throws IOException;
/** /**
* Enable 'trash' for the given dataset. When trash is enabled, files are * Enable 'trash' for the given dataset. When trash is enabled, files are

View File

@ -1822,31 +1822,35 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(String poolId,
throws IOException { long[] blockIds) throws IOException {
// List of VolumeIds, one per volume on the datanode // List of VolumeIds, one per volume on the datanode
List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size()); List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
// List of indexes into the list of VolumeIds, pointing at the VolumeId of // List of indexes into the list of VolumeIds, pointing at the VolumeId of
// the volume that the block is on // the volume that the block is on
List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size()); List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blockIds.length);
// Initialize the list of VolumeIds simply by enumerating the volumes // Initialize the list of VolumeIds simply by enumerating the volumes
for (int i = 0; i < volumes.volumes.size(); i++) { for (int i = 0; i < volumes.volumes.size(); i++) {
blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array()); blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
} }
// Determine the index of the VolumeId of each block's volume, by comparing // Determine the index of the VolumeId of each block's volume, by comparing
// the block's volume against the enumerated volumes // the block's volume against the enumerated volumes
for (int i = 0; i < blocks.size(); i++) { for (int i = 0; i < blockIds.length; i++) {
ExtendedBlock block = blocks.get(i); long blockId = blockIds[i];
FsVolumeSpi blockVolume = getReplicaInfo(block).getVolume();
boolean isValid = false; boolean isValid = false;
ReplicaInfo info = volumeMap.get(poolId, blockId);
int volumeIndex = 0; int volumeIndex = 0;
for (FsVolumeImpl volume : volumes.volumes) { if (info != null) {
// This comparison of references should be safe FsVolumeSpi blockVolume = info.getVolume();
if (blockVolume == volume) { for (FsVolumeImpl volume : volumes.volumes) {
isValid = true; // This comparison of references should be safe
break; if (blockVolume == volume) {
isValid = true;
break;
}
volumeIndex++;
} }
volumeIndex++;
} }
// Indicates that the block is not present, or not found in a data dir // Indicates that the block is not present, or not found in a data dir
if (!isValid) { if (!isValid) {
@ -1854,7 +1858,7 @@ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks)
} }
blocksVolumeIndexes.add(volumeIndex); blocksVolumeIndexes.add(volumeIndex);
} }
return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), return new HdfsBlocksMetadata(poolId, blockIds,
blocksVolumeIds, blocksVolumeIndexes); blocksVolumeIds, blocksVolumeIndexes);
} }

View File

@ -102,12 +102,18 @@ message GetBlockLocalPathInfoResponseProto {
} }
/** /**
* blocks - list of ExtendedBlocks on which we are querying additional info * Query for the disk locations of a number of blocks on this DN.
* tokens - list of access tokens corresponding to list of ExtendedBlocks * blockPoolId - the pool to query
* blockIds - list of block IDs to query
* tokens - list of access tokens corresponding to list of block IDs
*/ */
message GetHdfsBlockLocationsRequestProto { message GetHdfsBlockLocationsRequestProto {
repeated ExtendedBlockProto blocks = 1; // Removed: HDFS-3969
// repeated ExtendedBlockProto blocks = 1;
repeated hadoop.common.TokenProto tokens = 2; repeated hadoop.common.TokenProto tokens = 2;
required string blockPoolId = 3;
repeated sfixed64 blockIds = 4 [ packed = true ];
} }
/** /**
@ -118,7 +124,7 @@ message GetHdfsBlockLocationsRequestProto {
*/ */
message GetHdfsBlockLocationsResponseProto { message GetHdfsBlockLocationsResponseProto {
repeated bytes volumeIds = 1; repeated bytes volumeIds = 1;
repeated uint32 volumeIndexes = 2; repeated uint32 volumeIndexes = 2 [ packed = true ];
} }
/** /**

View File

@ -1321,10 +1321,10 @@
</property> </property>
<property> <property>
<name>dfs.client.file-block-storage-locations.timeout</name> <name>dfs.client.file-block-storage-locations.timeout.millis</name>
<value>60</value> <value>1000</value>
<description> <description>
Timeout (in seconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations(). Timeout (in milliseconds) for the parallel RPCs made in DistributedFileSystem#getFileBlockStorageLocations().
</description> </description>
</property> </property>

View File

@ -123,56 +123,19 @@ private <T> void testEqMany(final boolean eq, Comparable<T>... volumeIds) {
@Test @Test
public void testIdEmptyBytes() { public void testIdEmptyBytes() {
final VolumeId idEmpty1 = new HdfsVolumeId(new byte[0]); final VolumeId idEmpty1 = new HdfsVolumeId(new byte[0]);
assertTrue(idEmpty1.isValid());
final VolumeId idEmpty2 = new HdfsVolumeId(new byte[0]); final VolumeId idEmpty2 = new HdfsVolumeId(new byte[0]);
assertTrue(idEmpty2.isValid());
final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 }); final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 });
assertTrue(idNotEmpty.isValid());
testEq(true, idEmpty1, idEmpty2); testEq(true, idEmpty1, idEmpty2);
testEq(false, idEmpty1, idNotEmpty); testEq(false, idEmpty1, idNotEmpty);
testEq(false, idEmpty2, idNotEmpty); testEq(false, idEmpty2, idNotEmpty);
} }
/*
* Test the VolumeId.INVALID_VOLUME_ID singleton.
*/
@Test
public void testInvalidId() {
try {
new HdfsVolumeId(null);
assertTrue("NPE expected.", false);
} catch (NullPointerException npe) {
// okay
}
final VolumeId idEmpty = new HdfsVolumeId(new byte[] {});
final VolumeId idNotEmpty = new HdfsVolumeId(new byte[] { (byte)1 });
testEq(false, VolumeId.INVALID_VOLUME_ID, idNotEmpty);
testEq(false, VolumeId.INVALID_VOLUME_ID, idEmpty);
testEqMany(true,
new VolumeId[] {
VolumeId.INVALID_VOLUME_ID,
VolumeId.INVALID_VOLUME_ID,
VolumeId.INVALID_VOLUME_ID } );
testEqMany(false,
new VolumeId[] {
VolumeId.INVALID_VOLUME_ID,
idEmpty,
idNotEmpty });
}
/* /*
* test #toString() for typical VolumeId equality classes * test #toString() for typical VolumeId equality classes
*/ */
@Test @Test
public void testToString() { public void testToString() {
// The #toString() return value is only checked for != null.
// We cannot assert more.
String strInvalid = VolumeId.INVALID_VOLUME_ID.toString();
assertNotNull(strInvalid);
String strEmpty = new HdfsVolumeId(new byte[] {}).toString(); String strEmpty = new HdfsVolumeId(new byte[] {}).toString();
assertNotNull(strEmpty); assertNotNull(strEmpty);

View File

@ -399,13 +399,15 @@ public class DataNodeProperties {
Configuration conf; Configuration conf;
String[] dnArgs; String[] dnArgs;
SecureResources secureResources; SecureResources secureResources;
int ipcPort;
DataNodeProperties(DataNode node, Configuration conf, String[] args, DataNodeProperties(DataNode node, Configuration conf, String[] args,
SecureResources secureResources) { SecureResources secureResources, int ipcPort) {
this.datanode = node; this.datanode = node;
this.conf = conf; this.conf = conf;
this.dnArgs = args; this.dnArgs = args;
this.secureResources = secureResources; this.secureResources = secureResources;
this.ipcPort = ipcPort;
} }
public void setDnArgs(String ... args) { public void setDnArgs(String ... args) {
@ -1301,7 +1303,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
racks[i-curDatanodesNum]); racks[i-curDatanodesNum]);
} }
dn.runDatanodeDaemon(); dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources)); dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs,
secureResources, dn.getIpcPort()));
} }
curDatanodesNum += numDataNodes; curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes; this.numDataNodes += numDataNodes;
@ -1758,10 +1761,12 @@ public synchronized boolean restartDataNode(DataNodeProperties dnprop,
InetSocketAddress addr = dnprop.datanode.getXferAddress(); InetSocketAddress addr = dnprop.datanode.getXferAddress();
conf.set(DFS_DATANODE_ADDRESS_KEY, conf.set(DFS_DATANODE_ADDRESS_KEY,
addr.getAddress().getHostAddress() + ":" + addr.getPort()); addr.getAddress().getHostAddress() + ":" + addr.getPort());
conf.set(DFS_DATANODE_IPC_ADDRESS_KEY,
addr.getAddress().getHostAddress() + ":" + dnprop.ipcPort);
} }
DataNode newDn = DataNode.createDataNode(args, conf, secureResources);
dataNodes.add(new DataNodeProperties( dataNodes.add(new DataNodeProperties(
DataNode.createDataNode(args, conf, secureResources), newDn, newconf, args, secureResources, newDn.getIpcPort()));
newconf, args, secureResources));
numDataNodes++; numDataNodes++;
return true; return true;
} }

View File

@ -178,7 +178,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
} }
} }
dn.runDatanodeDaemon(); dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources)); dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources, dn.getIpcPort()));
} }
curDatanodesNum += numDataNodes; curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes; this.numDataNodes += numDataNodes;

View File

@ -20,6 +20,8 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -43,6 +45,7 @@
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation; import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
@ -56,15 +59,22 @@
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.HftpFileSystem; import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import org.mockito.InOrder; import org.mockito.InOrder;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
public class TestDistributedFileSystem { public class TestDistributedFileSystem {
private static final Random RAN = new Random(); private static final Random RAN = new Random();
@ -684,20 +694,47 @@ public void testAllWithNoXmlDefaults() throws Exception {
* {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
* call * call
*/ */
@Test @Test(timeout=60000)
public void testGetFileBlockStorageLocationsBatching() throws Exception { public void testGetFileBlockStorageLocationsBatching() throws Exception {
final Configuration conf = getTestConfiguration(); final Configuration conf = getTestConfiguration();
((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.TRACE);
((Log4JLogger)BlockStorageLocationUtil.LOG).getLogger().setLevel(Level.TRACE);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.TRACE);
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
true); true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build(); .numDataNodes(2).build();
try { try {
DistributedFileSystem fs = cluster.getFileSystem(); final DistributedFileSystem fs = cluster.getFileSystem();
// Create two files // Create two files
Path tmpFile1 = new Path("/tmpfile1.dat"); final Path tmpFile1 = new Path("/tmpfile1.dat");
Path tmpFile2 = new Path("/tmpfile2.dat"); final Path tmpFile2 = new Path("/tmpfile2.dat");
DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl); DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl); DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
// Make sure files are fully replicated before continuing
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<BlockLocation> list = Lists.newArrayList();
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0,
1024)));
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0,
1024)));
int totalRepl = 0;
for (BlockLocation loc : list) {
totalRepl += loc.getHosts().length;
}
if (totalRepl == 4) {
return true;
}
} catch(IOException e) {
// swallow
}
return false;
}
}, 500, 30000);
// Get locations of blocks of both files and concat together // Get locations of blocks of both files and concat together
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024); BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024); BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
@ -728,7 +765,7 @@ public void testGetFileBlockStorageLocationsBatching() throws Exception {
VolumeId id = l.getVolumeIds()[i]; VolumeId id = l.getVolumeIds()[i];
String name = l.getNames()[i]; String name = l.getNames()[i];
assertTrue("Expected block to be valid on datanode " + name, assertTrue("Expected block to be valid on datanode " + name,
id.isValid()); id != null);
} }
} }
} finally { } finally {
@ -740,38 +777,97 @@ public void testGetFileBlockStorageLocationsBatching() throws Exception {
* Tests error paths for * Tests error paths for
* {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)} * {@link DistributedFileSystem#getFileBlockStorageLocations(java.util.List)}
*/ */
@Test @Test(timeout=60000)
public void testGetFileBlockStorageLocationsError() throws Exception { public void testGetFileBlockStorageLocationsError() throws Exception {
final Configuration conf = getTestConfiguration(); final Configuration conf = getTestConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
true); true);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) conf.setInt(
.numDataNodes(2).build(); CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
MiniDFSCluster cluster = null;
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.getDataNodes(); cluster.getDataNodes();
DistributedFileSystem fs = cluster.getFileSystem(); final DistributedFileSystem fs = cluster.getFileSystem();
// Create a file
Path tmpFile = new Path("/tmpfile1.dat"); // Create a few files and add together their block locations into
DFSTestUtil.createFile(fs, tmpFile, 1024, (short) 2, 0xDEADDEADl); // a list.
// Get locations of blocks of the file final Path tmpFile1 = new Path("/errorfile1.dat");
BlockLocation[] blockLocs = fs.getFileBlockLocations(tmpFile, 0, 1024); final Path tmpFile2 = new Path("/errorfile2.dat");
// Stop a datanode to simulate a failure
cluster.stopDataNode(0); DFSTestUtil.createFile(fs, tmpFile1, 1024, (short) 2, 0xDEADDEADl);
DFSTestUtil.createFile(fs, tmpFile2, 1024, (short) 2, 0xDEADDEADl);
// Make sure files are fully replicated before continuing
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
List<BlockLocation> list = Lists.newArrayList();
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile1, 0,
1024)));
list.addAll(Arrays.asList(fs.getFileBlockLocations(tmpFile2, 0,
1024)));
int totalRepl = 0;
for (BlockLocation loc : list) {
totalRepl += loc.getHosts().length;
}
if (totalRepl == 4) {
return true;
}
} catch(IOException e) {
// swallow
}
return false;
}
}, 500, 30000);
BlockLocation[] blockLocs1 = fs.getFileBlockLocations(tmpFile1, 0, 1024);
BlockLocation[] blockLocs2 = fs.getFileBlockLocations(tmpFile2, 0, 1024);
List<BlockLocation> allLocs = Lists.newArrayList();
allLocs.addAll(Arrays.asList(blockLocs1));
allLocs.addAll(Arrays.asList(blockLocs2));
// Stop a datanode to simulate a failure.
DataNodeProperties stoppedNode = cluster.stopDataNode(0);
// Fetch VolumeBlockLocations // Fetch VolumeBlockLocations
BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(Arrays BlockStorageLocation[] locs = fs.getFileBlockStorageLocations(allLocs);
.asList(blockLocs)); assertEquals("Expected two HdfsBlockLocation for two 1-block files", 2,
assertEquals("Expected one HdfsBlockLocation for one 1-block file", 1,
locs.length); locs.length);
for (BlockStorageLocation l : locs) { for (BlockStorageLocation l : locs) {
assertEquals("Expected two replicas for each block", 2, assertEquals("Expected two replicas for each block", 2,
l.getHosts().length);
assertEquals("Expected two VolumeIDs for each block", 2,
l.getVolumeIds().length); l.getVolumeIds().length);
assertTrue("Expected one valid and one invalid replica", assertTrue("Expected one valid and one invalid volume",
(l.getVolumeIds()[0].isValid()) ^ (l.getVolumeIds()[1].isValid())); (l.getVolumeIds()[0] == null) ^ (l.getVolumeIds()[1] == null));
} }
// Start the datanode again, and remove one of the blocks.
// This is a different type of failure where the block itself
// is invalid.
cluster.restartDataNode(stoppedNode, true /*keepPort*/);
cluster.waitActive();
fs.delete(tmpFile2, true);
HATestUtil.waitForNNToIssueDeletions(cluster.getNameNode());
cluster.triggerHeartbeats();
HATestUtil.waitForDNDeletions(cluster);
locs = fs.getFileBlockStorageLocations(allLocs);
assertEquals("Expected two HdfsBlockLocations for two 1-block files", 2,
locs.length);
assertNotNull(locs[0].getVolumeIds()[0]);
assertNotNull(locs[0].getVolumeIds()[1]);
assertNull(locs[1].getVolumeIds()[0]);
assertNull(locs[1].getVolumeIds()[1]);
} finally { } finally {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
} }

View File

@ -1049,7 +1049,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
} }
@Override @Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks) public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid, long[] blockIds)
throws IOException { throws IOException {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -111,7 +111,7 @@ public Boolean get() {
* Wait for the NameNode to issue any deletions that are already * Wait for the NameNode to issue any deletions that are already
* pending (i.e. for the pendingDeletionBlocksCount to go to 0) * pending (i.e. for the pendingDeletionBlocksCount to go to 0)
*/ */
static void waitForNNToIssueDeletions(final NameNode nn) public static void waitForNNToIssueDeletions(final NameNode nn)
throws Exception { throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override

View File

@ -176,6 +176,12 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle MAPREDUCE-5761. Added a simple log message to denote when encrypted shuffle
is on in the shuffle-handler. (Jian He via vinodkv) is on in the shuffle-handler. (Jian He via vinodkv)
MAPREDUCE-5754. Preserve Job diagnostics in history (Gera Shegalov via
jlowe)
MAPREDUCE-5766. Moved ping messages from TaskAttempts to be at DEBUG level
inside the ApplicationMaster log. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -360,12 +360,13 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
if (taskStatus == null) { if (taskStatus == null) {
//We are using statusUpdate only as a simple ping //We are using statusUpdate only as a simple ping
LOG.info("Ping from " + taskAttemptID.toString()); if (LOG.isDebugEnabled()) {
LOG.debug("Ping from " + taskAttemptID.toString());
}
return feedback; return feedback;
} }
// if we are here there is an actual status update to be processed // if we are here there is an actual status update to be processed
LOG.info("Status update from " + taskAttemptID.toString());
taskHeartbeatHandler.progressing(yarnAttemptID); taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus = TaskAttemptStatus taskAttemptStatus =
@ -453,7 +454,7 @@ public JvmTask getTask(JvmContext context) throws IOException {
JVMId jvmId = context.jvmId; JVMId jvmId = context.jvmId;
LOG.info("JVM with ID : " + jvmId + " asked for a task"); LOG.info("JVM with ID : " + jvmId + " asked for a task");
JvmTask jvmTask = null; JvmTask jvmTask = null;
// TODO: Is it an authorized container to get a task? Otherwise return null. // TODO: Is it an authorized container to get a task? Otherwise return null.

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@ -343,11 +344,12 @@ protected void serviceStop() throws Exception {
LOG.warn("Found jobId " + toClose LOG.warn("Found jobId " + toClose
+ " to have not been closed. Will close"); + " to have not been closed. Will close");
//Create a JobFinishEvent so that it is written to the job history //Create a JobFinishEvent so that it is written to the job history
final Job job = context.getJob(toClose);
JobUnsuccessfulCompletionEvent jucEvent = JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose), new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), context.getJob(toClose) System.currentTimeMillis(), job.getCompletedMaps(),
.getCompletedMaps(), context.getJob(toClose).getCompletedReduces(), job.getCompletedReduces(), JobState.KILLED.toString(),
JobState.KILLED.toString()); job.getDiagnostics());
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent); JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
//Bypass the queue mechanism which might wait. Call the method directly //Bypass the queue mechanism which might wait. Call the method directly
handleEvent(jfEvent); handleEvent(jfEvent);

View File

@ -149,6 +149,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Maximum no. of fetch-failure notifications after which map task is failed // Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3; private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
public static final String JOB_KILLED_DIAG =
"Job received Kill while in RUNNING state.";
//final fields //final fields
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
@ -1617,7 +1620,8 @@ private void unsuccessfulFinish(JobStateInternal finalState) {
finishTime, finishTime,
succeededMapTaskCount, succeededMapTaskCount,
succeededReduceTaskCount, succeededReduceTaskCount,
finalState.toString()); finalState.toString(),
diagnostics);
eventHandler.handle(new JobHistoryEvent(jobId, eventHandler.handle(new JobHistoryEvent(jobId,
unsuccessfulJobEvent)); unsuccessfulJobEvent));
finished(finalState); finished(finalState);
@ -1730,7 +1734,7 @@ public void transition(JobImpl job, JobEvent event) {
JobUnsuccessfulCompletionEvent failedEvent = JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId, new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0, job.finishTime, 0, 0,
JobStateInternal.KILLED.toString()); JobStateInternal.KILLED.toString(), job.diagnostics);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(JobStateInternal.KILLED); job.finished(JobStateInternal.KILLED);
} }
@ -1763,7 +1767,7 @@ private static class KillTasksTransition
implements SingleArcTransition<JobImpl, JobEvent> { implements SingleArcTransition<JobImpl, JobEvent> {
@Override @Override
public void transition(JobImpl job, JobEvent event) { public void transition(JobImpl job, JobEvent event) {
job.addDiagnostic("Job received Kill while in RUNNING state."); job.addDiagnostic(JOB_KILLED_DIAG);
for (Task task : job.tasks.values()) { for (Task task : job.tasks.values()) {
job.eventHandler.handle( job.eventHandler.handle(
new TaskEvent(task.getID(), TaskEventType.T_KILL)); new TaskEvent(task.getID(), TaskEventType.T_KILL));
@ -2127,7 +2131,7 @@ public void transition(JobImpl job, JobEvent event) {
JobUnsuccessfulCompletionEvent failedEvent = JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId, new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0, job.finishTime, 0, 0,
jobHistoryString); jobHistoryString, job.diagnostics);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
job.finished(terminationState); job.finished(terminationState);
} }

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.junit.Test; import org.junit.Test;
public class TestEvents { public class TestEvents {
@ -334,11 +335,12 @@ private FakeEvent getSetupAttemptStartedEvent() {
private FakeEvent getJobKilledEvent() { private FakeEvent getJobKilledEvent() {
FakeEvent result = new FakeEvent(EventType.JOB_KILLED); FakeEvent result = new FakeEvent(EventType.JOB_KILLED);
JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion(); JobUnsuccessfulCompletion datum = new JobUnsuccessfulCompletion();
datum.finishedMaps = 1; datum.setFinishedMaps(1);
datum.finishedReduces = 2; datum.setFinishedReduces(2);
datum.finishTime = 3; datum.setFinishTime(3L);
datum.jobid = "ID"; datum.setJobid("ID");
datum.jobStatus = "STATUS"; datum.setJobStatus("STATUS");
datum.setDiagnostics(JobImpl.JOB_KILLED_DIAG);
result.setDatum(datum); result.setDatum(datum);
return result; return result;
} }

View File

@ -135,7 +135,8 @@
{"name": "finishTime", "type": "long"}, {"name": "finishTime", "type": "long"},
{"name": "finishedMaps", "type": "int"}, {"name": "finishedMaps", "type": "int"},
{"name": "finishedReduces", "type": "int"}, {"name": "finishedReduces", "type": "int"},
{"name": "jobStatus", "type": "string"} {"name": "jobStatus", "type": "string"},
{"name": "diagnostics", "type": "string"}
] ]
}, },

View File

@ -353,10 +353,6 @@ private void handleTaskFailedEvent(TaskFailedEvent event) {
taskInfo.error = StringInterner.weakIntern(event.getError()); taskInfo.error = StringInterner.weakIntern(event.getError());
taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
taskInfo.counters = event.getCounters(); taskInfo.counters = event.getCounters();
if (info.errorInfo.isEmpty()) {
info.errorInfo = "Task " + taskInfo.taskId + " failed " +
taskInfo.attemptsMap.size() + " times ";
}
} }
private void handleTaskStartedEvent(TaskStartedEvent event) { private void handleTaskStartedEvent(TaskStartedEvent event) {
@ -373,6 +369,7 @@ private void handleJobFailedEvent(JobUnsuccessfulCompletionEvent event) {
info.finishedMaps = event.getFinishedMaps(); info.finishedMaps = event.getFinishedMaps();
info.finishedReduces = event.getFinishedReduces(); info.finishedReduces = event.getFinishedReduces();
info.jobStatus = StringInterner.weakIntern(event.getStatus()); info.jobStatus = StringInterner.weakIntern(event.getStatus());
info.errorInfo = StringInterner.weakIntern(event.getDiagnostics());
} }
private void handleJobFinishedEvent(JobFinishedEvent event) { private void handleJobFinishedEvent(JobFinishedEvent event) {

View File

@ -18,11 +18,15 @@
package org.apache.hadoop.mapreduce.jobhistory; package org.apache.hadoop.mapreduce.jobhistory;
import com.google.common.base.Joiner;
import org.apache.avro.util.Utf8; import org.apache.avro.util.Utf8;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import java.util.Collections;
/** /**
* Event to record Failed and Killed completion of jobs * Event to record Failed and Killed completion of jobs
* *
@ -30,6 +34,10 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class JobUnsuccessfulCompletionEvent implements HistoryEvent { public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
private static final String NODIAGS = "";
private static final Iterable<String> NODIAGS_LIST =
Collections.singletonList(NODIAGS);
private JobUnsuccessfulCompletion datum private JobUnsuccessfulCompletion datum
= new JobUnsuccessfulCompletion(); = new JobUnsuccessfulCompletion();
@ -44,11 +52,33 @@ public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
public JobUnsuccessfulCompletionEvent(JobID id, long finishTime, public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
int finishedMaps, int finishedMaps,
int finishedReduces, String status) { int finishedReduces, String status) {
datum.jobid = new Utf8(id.toString()); this(id, finishTime, finishedMaps, finishedReduces, status, NODIAGS_LIST);
datum.finishTime = finishTime; }
datum.finishedMaps = finishedMaps;
datum.finishedReduces = finishedReduces; /**
datum.jobStatus = new Utf8(status); * Create an event to record unsuccessful completion (killed/failed) of jobs
* @param id Job ID
* @param finishTime Finish time of the job
* @param finishedMaps Number of finished maps
* @param finishedReduces Number of finished reduces
* @param status Status of the job
* @param diagnostics job runtime diagnostics
*/
public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
int finishedMaps,
int finishedReduces,
String status,
Iterable<String> diagnostics) {
datum.setJobid(new Utf8(id.toString()));
datum.setFinishTime(finishTime);
datum.setFinishedMaps(finishedMaps);
datum.setFinishedReduces(finishedReduces);
datum.setJobStatus(new Utf8(status));
if (diagnostics == null) {
diagnostics = NODIAGS_LIST;
}
datum.setDiagnostics(new Utf8(Joiner.on('\n').skipNulls()
.join(diagnostics)));
} }
JobUnsuccessfulCompletionEvent() {} JobUnsuccessfulCompletionEvent() {}
@ -61,13 +91,13 @@ public void setDatum(Object datum) {
/** Get the Job ID */ /** Get the Job ID */
public JobID getJobId() { return JobID.forName(datum.jobid.toString()); } public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
/** Get the job finish time */ /** Get the job finish time */
public long getFinishTime() { return datum.finishTime; } public long getFinishTime() { return datum.getFinishTime(); }
/** Get the number of finished maps */ /** Get the number of finished maps */
public int getFinishedMaps() { return datum.finishedMaps; } public int getFinishedMaps() { return datum.getFinishedMaps(); }
/** Get the number of finished reduces */ /** Get the number of finished reduces */
public int getFinishedReduces() { return datum.finishedReduces; } public int getFinishedReduces() { return datum.getFinishedReduces(); }
/** Get the status */ /** Get the status */
public String getStatus() { return datum.jobStatus.toString(); } public String getStatus() { return datum.getJobStatus().toString(); }
/** Get the event type */ /** Get the event type */
public EventType getEventType() { public EventType getEventType() {
if ("FAILED".equals(getStatus())) { if ("FAILED".equals(getStatus())) {
@ -78,4 +108,13 @@ public EventType getEventType() {
return EventType.JOB_KILLED; return EventType.JOB_KILLED;
} }
/**
* Retrieves diagnostics information preserved in the history file
*
* @return diagnostics as of the time of job termination
*/
public String getDiagnostics() {
final CharSequence diagnostics = datum.getDiagnostics();
return diagnostics == null ? NODIAGS : diagnostics.toString();
}
} }

View File

@ -73,7 +73,7 @@
<configuration> <configuration>
<excludes> <excludes>
<exclude>src/test/resources/job_1329348432655_0001_conf.xml</exclude> <exclude>src/test/resources/job_1329348432655_0001_conf.xml</exclude>
<exclude>src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist</exclude> <exclude>src/test/resources/*.jhist</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.mapreduce.v2.hs; package org.apache.hadoop.mapreduce.v2.hs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -25,6 +29,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -36,9 +41,9 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
@ -53,6 +58,7 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
@ -66,8 +72,11 @@
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl; import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
@ -149,7 +158,7 @@ private void checkHistoryParsing(final int numMaps, final int numReduces,
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis(); long amStartTimeEst = System.currentTimeMillis();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass() MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
@ -390,7 +399,7 @@ public void testHistoryParsingForFailedAttempts() throws Exception {
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
@ -455,7 +464,7 @@ public void testCountersForFailedTask() throws Exception {
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
@ -499,18 +508,85 @@ public void testCountersForFailedTask() throws Exception {
Assert.assertNotNull("completed task report has null counters", ct Assert.assertNotNull("completed task report has null counters", ct
.getReport().getCounters()); .getReport().getCounters());
} }
final List<String> originalDiagnostics = job.getDiagnostics();
final String historyError = jobInfo.getErrorInfo();
assertTrue("No original diagnostics for a failed job",
originalDiagnostics != null && !originalDiagnostics.isEmpty());
assertNotNull("No history error info for a failed job ", historyError);
for (String diagString : originalDiagnostics) {
assertTrue(historyError.contains(diagString));
}
} finally { } finally {
LOG.info("FINISHED testCountersForFailedTask"); LOG.info("FINISHED testCountersForFailedTask");
} }
} }
@Test(timeout = 60000)
public void testDiagnosticsForKilledJob() throws Exception {
LOG.info("STARTING testDiagnosticsForKilledJob");
try {
final Configuration conf = new Configuration();
conf.setClass(
NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
MRApp app = new MRAppWithHistoryWithJobKilled(2, 1, true, this
.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
app.waitForState(job, JobState.KILLED);
// make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobHistoryParser parser;
JobInfo jobInfo;
synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
in = fc.open(fc.makeQualified(historyFilePath));
} catch (IOException ioe) {
LOG.info("Can not open history file: " + historyFilePath, ioe);
throw (new Exception("Can not open History File"));
}
parser = new JobHistoryParser(in);
jobInfo = parser.parse();
}
Exception parseException = parser.getParseException();
assertNull("Caught an expected exception " + parseException,
parseException);
final List<String> originalDiagnostics = job.getDiagnostics();
final String historyError = jobInfo.getErrorInfo();
assertTrue("No original diagnostics for a failed job",
originalDiagnostics != null && !originalDiagnostics.isEmpty());
assertNotNull("No history error info for a failed job ", historyError);
for (String diagString : originalDiagnostics) {
assertTrue(historyError.contains(diagString));
}
assertTrue("No killed message in diagnostics",
historyError.contains(JobImpl.JOB_KILLED_DIAG));
} finally {
LOG.info("FINISHED testDiagnosticsForKilledJob");
}
}
@Test(timeout = 50000) @Test(timeout = 50000)
public void testScanningOldDirs() throws Exception { public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs"); LOG.info("STARTING testScanningOldDirs");
try { try {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(), MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
@ -590,6 +666,27 @@ protected void attemptLaunched(TaskAttemptId attemptID) {
} }
} }
static class MRAppWithHistoryWithJobKilled extends MRAppWithHistory {
public MRAppWithHistoryWithJobKilled(int maps, int reduces,
boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
if (attemptID.getTaskId().getId() == 0) {
getContext().getEventHandler().handle(
new JobEvent(attemptID.getTaskId().getJobId(),
JobEventType.JOB_KILL));
} else {
getContext().getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE));
}
}
}
static class HistoryFileManagerForTest extends HistoryFileManager { static class HistoryFileManagerForTest extends HistoryFileManager {
void deleteJobFromJobListCache(HistoryFileInfo fileInfo) { void deleteJobFromJobListCache(HistoryFileInfo fileInfo) {
jobListCache.delete(fileInfo); jobListCache.delete(fileInfo);
@ -613,7 +710,7 @@ public void testDeleteFileInfo() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setClass( conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf); RackResolver.init(conf);
@ -668,7 +765,7 @@ public void testJobHistoryMethods() throws Exception {
Configuration configuration = new Configuration(); Configuration configuration = new Configuration();
configuration configuration
.setClass( .setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class); MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(configuration); RackResolver.init(configuration);
@ -743,7 +840,7 @@ public void testMultipleFailedTasks() throws Exception {
final org.apache.hadoop.mapreduce.TaskType taskType = final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP; org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2]; final TaskID[] tids = new TaskID[2];
JobID jid = new JobID("1", 1); final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0); tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1); tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer( Mockito.when(reader.getNextEvent()).thenAnswer(
@ -762,6 +859,13 @@ public HistoryEvent answer(InvocationOnMock invocation)
tfe.setDatum(tfe.getDatum()); tfe.setDatum(tfe.getDatum());
return tfe; return tfe;
} }
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null; return null;
} }
}); });
@ -769,4 +873,22 @@ public HistoryEvent answer(InvocationOnMock invocation)
assertTrue("Task 0 not implicated", assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString())); info.getErrorInfo().contains(tids[0].toString()));
} }
@Test
public void testFailedJobHistoryWithoutDiagnostics() throws Exception {
final Path histPath = new Path(getClass().getClassLoader().getResource(
"job_1393307629410_0001-1393307687476-user-Sleep+job-1393307723835-0-0-FAILED-default-1393307693920.jhist")
.getFile());
final FileSystem lfs = FileSystem.getLocal(new Configuration());
final FSDataInputStream fsdis = lfs.open(histPath);
try {
JobHistoryParser parser = new JobHistoryParser(fsdis);
JobInfo info = parser.parse();
assertEquals("History parsed jobId incorrectly",
info.getJobId(), JobID.forName("job_1393307629410_0001") );
assertEquals("Default diagnostics incorrect ", "", info.getErrorInfo());
} finally {
fsdis.close();
}
}
} }

View File

@ -32,6 +32,9 @@ Release 2.5.0 - UNRELEASED
YARN-1561. Fix a generic type warning in FairScheduler. (Chen He via junping_du) YARN-1561. Fix a generic type warning in FairScheduler. (Chen He via junping_du)
YARN-1429. *nix: Allow a way for users to augment classpath of YARN daemons.
(Jarek Jarcec Cecho via kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -235,6 +238,9 @@ Release 2.4.0 - UNRELEASED
YARN-1749. Updated application-history related configs to reflect the latest YARN-1749. Updated application-history related configs to reflect the latest
reality and to be consistently named. (Zhijie Shen via vinodkv) reality and to be consistently named. (Zhijie Shen via vinodkv)
YARN-1301. Added the INFO level log of the non-empty blacklist additions
and removals inside ApplicationMasterService. (Tsuyoshi Ozawa via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -22,7 +22,12 @@
# #
# JAVA_HOME The java implementation to use. Overrides JAVA_HOME. # JAVA_HOME The java implementation to use. Overrides JAVA_HOME.
# #
# YARN_CLASSPATH Extra Java CLASSPATH entries. # YARN_USER_CLASSPATH Additional user CLASSPATH entries.
#
# YARN_USER_CLASSPATH_FIRST If set to non empty value then the user classpath
# specified in YARN_USER_CLASSPATH will be
# appended at the beginning of YARN's final
# classpath instead of at the end.
# #
# YARN_HEAPSIZE The maximum amount of heap to use, in MB. # YARN_HEAPSIZE The maximum amount of heap to use, in MB.
# Default is 1000. # Default is 1000.
@ -163,6 +168,17 @@ fi
CLASSPATH=${CLASSPATH}:$HADOOP_YARN_HOME/${YARN_DIR}/* CLASSPATH=${CLASSPATH}:$HADOOP_YARN_HOME/${YARN_DIR}/*
CLASSPATH=${CLASSPATH}:$HADOOP_YARN_HOME/${YARN_LIB_JARS_DIR}/* CLASSPATH=${CLASSPATH}:$HADOOP_YARN_HOME/${YARN_LIB_JARS_DIR}/*
# Add user defined YARN_USER_CLASSPATH to the class path (if defined)
if [ -n "$YARN_USER_CLASSPATH" ]; then
if [ -n "$YARN_USER_CLASSPATH_FIRST" ]; then
# User requested to add the custom entries at the beginning
CLASSPATH=${YARN_USER_CLASSPATH}:${CLASSPATH}
else
# By default we will just append the extra entries at the end
CLASSPATH=${CLASSPATH}:${YARN_USER_CLASSPATH}
fi
fi
# so that filenames w/ spaces are handled correctly in loops below # so that filenames w/ spaces are handled correctly in loops below
IFS= IFS=
@ -249,4 +265,3 @@ if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
fi fi
exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@" exec "$JAVA" -Dproc_$COMMAND $JAVA_HEAP_MAX $YARN_OPTS -classpath "$CLASSPATH" $CLASS "$@"
fi

View File

@ -22,6 +22,7 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -447,10 +448,10 @@ public AllocateResponse allocate(AllocateRequest request)
request.getResourceBlacklistRequest(); request.getResourceBlacklistRequest();
List<String> blacklistAdditions = List<String> blacklistAdditions =
(blacklistRequest != null) ? (blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : null; blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
List<String> blacklistRemovals = List<String> blacklistRemovals =
(blacklistRequest != null) ? (blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : null; blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
// sanity check // sanity check
try { try {
@ -487,6 +488,11 @@ public AllocateResponse allocate(AllocateRequest request)
this.rScheduler.allocate(appAttemptId, ask, release, this.rScheduler.allocate(appAttemptId, ask, release,
blacklistAdditions, blacklistRemovals); blacklistAdditions, blacklistRemovals);
if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
LOG.info("blacklist are updated in Scheduler." +
"blacklistAdditions: " + blacklistAdditions + ", " +
"blacklistRemovals: " + blacklistRemovals);
}
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse = AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);