Merge trunk into auto-HA branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1339410 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-16 22:47:37 +00:00
commit e14e10fb86
75 changed files with 2165 additions and 1253 deletions

View File

@ -84,7 +84,7 @@ public class KerberosName {
try {
defaultRealm = KerberosUtil.getDefaultRealm();
} catch (Exception ke) {
LOG.warn("Kerberos krb5 configuration not found, setting default realm to empty");
LOG.debug("Kerberos krb5 configuration not found, setting default realm to empty");
defaultRealm="";
}
}

View File

@ -65,6 +65,8 @@ Trunk (unreleased changes)
HADOOP-8308. Support cross-project Jenkins builds. (tomwhite)
HADOOP-8297. Writable javadocs don't carry default constructor (harsh)
BUG FIXES
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
@ -140,15 +142,45 @@ Trunk (unreleased changes)
HADOOP-7761. Improve the performance of raw comparisons. (todd)
Release 2.0.0 - UNRELEASED
Release 2.0.1-alpha - UNRELEASED
INCOMPATIBLE CHANGES
HADOOP-8388. Remove unused BlockLocation serialization.
(Colin Patrick McCabe via eli)
NEW FEATURES
IMPROVEMENTS
HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual
final release. (todd)
HADOOP-8361. Avoid out-of-memory problems when deserializing strings.
(Colin Patrick McCabe via eli)
HADOOP-8224. Don't hardcode hdfs.audit.logger in the scripts.
(Tomohiko Kinebuchi via eli)
HADOOP-8398. Cleanup BlockLocation. (eli)
BUG FIXES
HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname
starting with a numeric character. (Junping Du via suresh)
HADOOP-8393. hadoop-config.sh missing variable exports, causes Yarn jobs to fail with ClassNotFoundException MRAppMaster. (phunt via tucu)
HADOOP-8316. Audit logging should be disabled by default. (eli)
HADOOP-8400. All commands warn "Kerberos krb5 configuration not found" when security is not enabled. (tucu)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES
HADOOP-7920. Remove Avro Rpc. (suresh)
HADOOP-8388. Remove unused BlockLocation serialization.
(Colin Patrick McCabe via eli)
NEW FEATURES
HADOOP-7773. Add support for protocol buffer based RPC engine.
@ -306,18 +338,9 @@ Release 2.0.0 - UNRELEASED
HADOOP-8356. FileSystem service loading mechanism should print the FileSystem
impl it is failing to load (tucu)
HADOOP-8340. SNAPSHOT build versions should compare as less than their eventual
final release. (todd)
HADOOP-8361. Avoid out-of-memory problems when deserializing strings.
(Colin Patrick McCabe via eli)
HADOOP-8353. hadoop-daemon.sh and yarn-daemon.sh can be misleading on stop.
(Roman Shaposhnik via atm)
HADOOP-8224. Don't hardcode hdfs.audit.logger in the scripts.
(Tomohiko Kinebuchi via eli)
HADOOP-8113. Correction to BUILDING.txt: HDFS needs ProtocolBuffer, too
(not just MapReduce). Contributed by Eugene Koontz.
@ -458,9 +481,6 @@ Release 2.0.0 - UNRELEASED
HADOOP-8359. Fix javadoc warnings in Configuration. (Anupam Seth via
szetszwo)
HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname
starting with a numeric character. (Junping Du via suresh)
BREAKDOWN OF HADOOP-7454 SUBTASKS
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
@ -522,8 +542,6 @@ Release 2.0.0 - UNRELEASED
HADOOP-7868. Hadoop native fails to compile when default linker
option is -Wl,--as-needed. (Trevor Robinson via eli)
HADOOP-8316. Audit logging should be disabled by default. (eli)
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -172,7 +172,7 @@ IFS=
if [ "$HADOOP_COMMON_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$HADOOP_COMMON_DIR" ]; then
HADOOP_COMMON_HOME=$HADOOP_PREFIX
export HADOOP_COMMON_HOME=$HADOOP_PREFIX
fi
fi
@ -252,7 +252,7 @@ HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"
# put hdfs in classpath if present
if [ "$HADOOP_HDFS_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$HDFS_DIR" ]; then
HADOOP_HDFS_HOME=$HADOOP_PREFIX
export HADOOP_HDFS_HOME=$HADOOP_PREFIX
fi
fi
@ -269,7 +269,7 @@ CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/$HDFS_DIR'/*'
# put yarn in classpath if present
if [ "$YARN_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$YARN_DIR" ]; then
YARN_HOME=$HADOOP_PREFIX
export YARN_HOME=$HADOOP_PREFIX
fi
fi
@ -286,7 +286,7 @@ CLASSPATH=${CLASSPATH}:$YARN_HOME/$YARN_DIR'/*'
# put mapred in classpath if present AND different from YARN
if [ "$HADOOP_MAPRED_HOME" = "" ]; then
if [ -d "${HADOOP_PREFIX}/$MAPRED_DIR" ]; then
HADOOP_MAPRED_HOME=$HADOOP_PREFIX
export HADOOP_MAPRED_HOME=$HADOOP_PREFIX
fi
fi

View File

@ -17,29 +17,23 @@
*/
package org.apache.hadoop.fs;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
/*
* A BlockLocation lists hosts, offset and length
* of block.
*
/**
* Represents the network location of a block, information about the hosts
* that contain block replicas, and other block metadata (E.g. the file
* offset associated with the block, length, whether it is corrupt, etc).
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class BlockLocation {
private String[] hosts; //hostnames of datanodes
private String[] names; //hostname:portNumber of datanodes
private String[] topologyPaths; // full path name in network topology
private long offset; //offset of the of the block in the file
private String[] hosts; // Datanode hostnames
private String[] names; // Datanode IP:xferPort for accessing the block
private String[] topologyPaths; // Full path name in network topology
private long offset; // Offset of the block in the file
private long length;
private boolean corrupt;
@ -105,7 +99,7 @@ public BlockLocation(String[] names, String[] hosts, String[] topologyPaths,
* Get the list of hosts (hostname) hosting this block
*/
public String[] getHosts() throws IOException {
if ((hosts == null) || (hosts.length == 0)) {
if (hosts == null || hosts.length == 0) {
return new String[0];
} else {
return hosts;
@ -113,25 +107,25 @@ public String[] getHosts() throws IOException {
}
/**
* Get the list of names (hostname:port) hosting this block
* Get the list of names (IP:xferPort) hosting this block
*/
public String[] getNames() throws IOException {
if ((names == null) || (names.length == 0)) {
if (names == null || names.length == 0) {
return new String[0];
} else {
return this.names;
return names;
}
}
/**
* Get the list of network topology paths for each of the hosts.
* The last component of the path is the host.
* The last component of the path is the "name" (IP:xferPort).
*/
public String[] getTopologyPaths() throws IOException {
if ((topologyPaths == null) || (topologyPaths.length == 0)) {
if (topologyPaths == null || topologyPaths.length == 0) {
return new String[0];
} else {
return this.topologyPaths;
return topologyPaths;
}
}

View File

@ -39,20 +39,23 @@
* <p>Example:</p>
* <p><blockquote><pre>
* public class MyWritable implements Writable {
* // Some data
* // Some data
* private int counter;
* private long timestamp;
*
*
* // Default constructor to allow (de)serialization
* MyWritable() { }
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
*
* public static MyWritable read(DataInput in) throws IOException {
* MyWritable w = new MyWritable();
* w.readFields(in);

View File

@ -40,6 +40,7 @@ public interface Node {
* @param location the location
*/
public void setNetworkLocation(String location);
/** @return this node's name */
public String getName();

View File

@ -110,7 +110,7 @@ private void set(String name, String location) {
* @return the path of a node
*/
public static String getPath(Node node) {
return node.getNetworkLocation()+PATH_SEPARATOR_STR+node.getName();
return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
}
/** @return this node's path as its string representation */

View File

@ -77,6 +77,8 @@ Trunk (unreleased changes)
HDFS-3293. Add toString(), equals(..) and hashCode() to JournalInfo.
(Hari Mankude via szetszwo)
HDFS-3197. Incorrect class comments in a few tests. (Andy Isaacson via eli)
OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
@ -109,9 +111,6 @@ Trunk (unreleased changes)
HDFS-2776. Missing interface annotation on JournalSet.
(Brandon Li via jitendra)
HDFS-2759. Pre-allocate HDFS edit log files after writing version number.
(atm)
HDFS-2908. Add apache license header for StorageReport.java. (Brandon Li
via jitendra)
@ -141,8 +140,73 @@ Trunk (unreleased changes)
HDFS-3243. TestParallelRead timing out on jenkins. (Henry Robinson via todd)
HDFS-3265. PowerPc Build error. (Kumar Ravi via mattf)
HDFS-2312. FSNamesystem javadoc incorrectly says its for DNs. (harsh)
HDFS-3163. TestHDFSCLI.testAll fails if the user name is not all lowercase.
(Brandon Li via atm)
Release 2.0.0 - UNRELEASED
Release 2.0.1-alpha - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
logging is enabled. (atm)
HDFS-3341. Change minimum RPC versions to respective SNAPSHOTs instead of
final releases. (todd)
HDFS-3369. Rename {get|set|add}INode(..) methods in BlockManager and
BlocksMap to {get|set|add}BlockCollection(..). (John George via szetszwo)
HDFS-3134. harden edit log loader against malformed or malicious input.
(Colin Patrick McCabe via eli)
HDFS-3230. Cleanup DatanodeID creation in the tests. (eli)
HDFS-3401. Cleanup DatanodeDescriptor creation in the tests. (eli)
HDFS-3400. DNs should be able start with jsvc even if security is disabled.
(atm via eli)
HDFS-3404. Make putImage in GetImageServlet infer remote address to fetch
from request. (atm)
HDFS-3335. check for edit log corruption at the end of the log
(Colin Patrick McCabe via todd)
HDFS-3417. Rename BalancerDatanode#getName to getDisplayName to be
consistent with Datanode. (eli)
HDFS-3416. Cleanup DatanodeID and DatanodeRegistration
constructors used by testing. (eli)
HDFS-3419. Cleanup LocatedBlock. (eli)
OPTIMIZATIONS
BUG FIXES
HDFS-3385. The last block of INodeFileUnderConstruction is not
necessarily a BlockInfoUnderConstruction, so do not cast it in
FSNamesystem.recoverLeaseInternal(..). (szetszwo)
HDFS-3414. Balancer does not find NameNode if rpc-address or
servicerpc-address are not set in client configs. (atm)
HDFS-3031. Fix complete() and getAdditionalBlock() RPCs to be idempotent
(todd)
HDFS-2759. Pre-allocate HDFS edit log files after writing version number.
(atm)
HDFS-3413. TestFailureToReadEdits timing out. (atm)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES
@ -432,28 +496,12 @@ Release 2.0.0 - UNRELEASED
so that INodeFile and INodeFileUnderConstruction do not have to be used in
block management. (John George via szetszwo)
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
logging is enabled. (atm)
HDFS-3341. Change minimum RPC versions to respective SNAPSHOTs instead of
final releases. (todd)
HDFS-3369. Rename {get|set|add}INode(..) methods in BlockManager and
BlocksMap to {get|set|add}BlockCollection(..). (John George via szetszwo)
HDFS-3134. harden edit log loader against malformed or malicious input.
(Colin Patrick McCabe via eli)
HDFS-3230. Cleanup DatanodeID creation in the tests. (eli)
HDFS-3401. Cleanup DatanodeDescriptor creation in the tests. (eli)
HDFS-3400. DNs should be able start with jsvc even if security is disabled.
(atm via eli)
HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo
and epoch in JournalProtocol. (suresh via szetszwo)
HDFS-3418. Rename BlockWithLocationsProto datanodeIDs field to storageIDs.
(eli)
OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
@ -647,10 +695,6 @@ Release 2.0.0 - UNRELEASED
HDFS-3395. NN doesn't start with HA+security enabled and HTTP address
set to 0.0.0.0. (atm)
HDFS-3385. The last block of INodeFileUnderConstruction is not
necessarily a BlockInfoUnderConstruction, so do not cast it in
FSNamesystem.recoverLeaseInternal(..). (szetszwo)
HDFS-3026. HA: Handle failure during HA state transition. (atm)
BREAKDOWN OF HDFS-1623 SUBTASKS

View File

@ -965,6 +965,7 @@ private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
DatanodeInfo[] nodes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
do {
hasError = false;
lastException = null;
@ -972,9 +973,11 @@ private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
success = false;
long startTime = System.currentTimeMillis();
DatanodeInfo[] w = excludedNodes.toArray(
DatanodeInfo[] excluded = excludedNodes.toArray(
new DatanodeInfo[excludedNodes.size()]);
lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
block = oldBlock;
lb = locateFollowingBlock(startTime,
excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getBlockToken();

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -608,6 +609,14 @@ public static Collection<URI> getNsServiceRpcUris(Configuration conf) {
public static Collection<URI> getNameServiceUris(Configuration conf,
String... keys) {
Set<URI> ret = new HashSet<URI>();
// We're passed multiple possible configuration keys for any given NN or HA
// nameservice, and search the config in order of these keys. In order to
// make sure that a later config lookup (e.g. fs.defaultFS) doesn't add a
// URI for a config key for which we've already found a preferred entry, we
// keep track of non-preferred keys here.
Set<URI> nonPreferredUris = new HashSet<URI>();
for (String nsId : getNameServiceIds(conf)) {
if (HAUtil.isHAEnabled(conf, nsId)) {
// Add the logical URI of the nameservice.
@ -618,24 +627,46 @@ public static Collection<URI> getNameServiceUris(Configuration conf,
}
} else {
// Add the URI corresponding to the address of the NN.
boolean uriFound = false;
for (String key : keys) {
String addr = conf.get(concatSuffixes(key, nsId));
if (addr != null) {
ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
NetUtils.createSocketAddr(addr)));
break;
URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
NetUtils.createSocketAddr(addr));
if (!uriFound) {
uriFound = true;
ret.add(uri);
} else {
nonPreferredUris.add(uri);
}
}
}
}
}
// Add the generic configuration keys.
boolean uriFound = false;
for (String key : keys) {
String addr = conf.get(key);
if (addr != null) {
ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
break;
URI uri = createUri("hdfs", NetUtils.createSocketAddr(addr));
if (!uriFound) {
uriFound = true;
ret.add(uri);
} else {
nonPreferredUris.add(uri);
}
}
}
// Add the default URI if it is an HDFS URI.
URI defaultUri = FileSystem.getDefaultUri(conf);
if (defaultUri != null &&
HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
!nonPreferredUris.contains(defaultUri)) {
ret.add(defaultUri);
}
return ret;
}

View File

@ -214,6 +214,17 @@ public boolean equals(Object o) {
}
return compareTo((Block)o) == 0;
}
/**
* @return true if the two blocks have the same block ID and the same
* generation stamp, or if both blocks are null.
*/
public static boolean matchingIdAndGenStamp(Block a, Block b) {
if (a == b) return true; // same block, or both null
if (a == null || b == null) return false; // only one null
return a.blockId == b.blockId &&
a.generationStamp == b.generationStamp;
}
@Override // Object
public int hashCode() {

View File

@ -309,6 +309,7 @@ public void abandonBlock(ExtendedBlock b, String src, String holder)
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
@Idempotent
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
throws AccessControlException, FileNotFoundException,
@ -362,6 +363,7 @@ public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
@Idempotent
public boolean complete(String src, String clientName, ExtendedBlock last)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException;

View File

@ -20,7 +20,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
* This class represents the primary identifier for a Datanode.
@ -45,23 +44,6 @@ public class DatanodeID implements Comparable<DatanodeID> {
protected int infoPort; // info server port
protected int ipcPort; // IPC server port
public DatanodeID(String ipAddr, int xferPort) {
this(ipAddr, "", "", xferPort,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
public DatanodeID(String ipAddr, String hostName, int xferPort) {
this(ipAddr, hostName, "", xferPort,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
/**
* DatanodeID copy constructor
*
* @param from
*/
public DatanodeID(DatanodeID from) {
this(from.getIpAddr(),
from.getHostName(),
@ -72,7 +54,7 @@ public DatanodeID(DatanodeID from) {
}
/**
* Create DatanodeID
* Create a DatanodeID
* @param ipAddr IP
* @param hostName hostname
* @param storageID data storage ID
@ -94,22 +76,6 @@ public void setIpAddr(String ipAddr) {
this.ipAddr = ipAddr;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public void setXferPort(int xferPort) {
this.xferPort = xferPort;
}
public void setInfoPort(int infoPort) {
this.infoPort = infoPort;
}
public void setIpcPort(int ipcPort) {
this.ipcPort = ipcPort;
}
public void setStorageID(String storageID) {
this.storageID = storageID;
}

View File

@ -22,11 +22,11 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
/****************************************************
* A LocatedBlock is a pair of Block, DatanodeInfo[]
* objects. It tells where to find a Block.
*
****************************************************/
/**
* Associates a block with the Datanodes that contain its replicas
* and other block metadata (E.g. the file offset associated with this
* block, whether it is corrupt, security token, etc).
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class LocatedBlock {
@ -40,19 +40,6 @@ public class LocatedBlock {
private boolean corrupt;
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
public LocatedBlock() {
this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
}
public LocatedBlock(ExtendedBlock eb) {
this(eb, new DatanodeInfo[0], 0L, false);
}
public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1, false); // startOffset is unknown
}
@ -81,14 +68,10 @@ public void setBlockToken(Token<BlockTokenIdentifier> token) {
this.blockToken = token;
}
/**
*/
public ExtendedBlock getBlock() {
return b;
}
/**
*/
public DatanodeInfo[] getLocations() {
return locs;
}

View File

@ -105,8 +105,9 @@ public boolean isUnderConstruction() {
* @return block if found, or null otherwise.
*/
public int findBlock(long offset) {
// create fake block of size 1 as a key
LocatedBlock key = new LocatedBlock();
// create fake block of size 0 as a key
LocatedBlock key = new LocatedBlock(
new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
key.setStartOffset(offset);
key.getBlock().setNumBytes(1);
Comparator<LocatedBlock> comp =

View File

@ -254,11 +254,11 @@ public static Block convert(BlockProto b) {
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
return BlockWithLocationsProto.newBuilder()
.setBlock(convert(blk.getBlock()))
.addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build();
.addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList()
return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList()
.toArray(new String[0]));
}

View File

@ -205,6 +205,7 @@ public class Balancer {
private Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>();
private MovedBlocks movedBlocks = new MovedBlocks();
// Map storage IDs to BalancerDatanodes
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
@ -262,9 +263,9 @@ private boolean markMovedIfGoodBlock(BalancerBlock block) {
if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move block "+ block.getBlockId()
+" with a length of "+StringUtils.byteDesc(block.getNumBytes())
+ " bytes from " + source.getName()
+ " to " + target.getName()
+ " using proxy source " + proxySource.getName() );
+ " bytes from " + source.getDisplayName()
+ " to " + target.getDisplayName()
+ " using proxy source " + proxySource.getDisplayName() );
}
return true;
}
@ -317,15 +318,15 @@ private void dispatch() {
receiveResponse(in);
bytesMoved.inc(block.getNumBytes());
LOG.info( "Moving block " + block.getBlock().getBlockId() +
" from "+ source.getName() + " to " +
target.getName() + " through " +
proxySource.getName() +
" from "+ source.getDisplayName() + " to " +
target.getDisplayName() + " through " +
proxySource.getDisplayName() +
" is succeeded." );
} catch (IOException e) {
LOG.warn("Error moving block "+block.getBlockId()+
" from " + source.getName() + " to " +
target.getName() + " through " +
proxySource.getName() +
" from " + source.getDisplayName() + " to " +
target.getDisplayName() + " through " +
proxySource.getDisplayName() +
": "+e.getMessage());
} finally {
IOUtils.closeStream(out);
@ -378,7 +379,8 @@ private void scheduleBlockMove() {
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting moving "+ block.getBlockId() +
" from " + proxySource.getName() + " to " + target.getName());
" from " + proxySource.getDisplayName() + " to " +
target.getDisplayName());
}
dispatch();
}
@ -475,7 +477,7 @@ private static class BalancerDatanode {
@Override
public String toString() {
return getClass().getSimpleName() + "[" + getName()
return getClass().getSimpleName() + "[" + datanode
+ ", utilization=" + utilization + "]";
}
@ -507,8 +509,8 @@ protected DatanodeInfo getDatanode() {
}
/** Get the name of the datanode */
protected String getName() {
return datanode.getName();
protected String getDisplayName() {
return datanode.toString();
}
/* Get the storage id of the datanode */
@ -620,8 +622,8 @@ private long getBlockList() throws IOException {
synchronized (block) {
// update locations
for ( String location : blk.getDatanodes() ) {
BalancerDatanode datanode = datanodes.get(location);
for ( String storageID : blk.getStorageIDs() ) {
BalancerDatanode datanode = datanodes.get(storageID);
if (datanode != null) { // not an unknown datanode
block.addLocation(datanode);
}
@ -831,7 +833,7 @@ private long initNodes(DatanodeInfo[] datanodes) {
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
} else {
assert(isOverUtilized(datanodeS)) :
datanodeS.getName()+ "is not an overUtilized node";
datanodeS.getDisplayName()+ "is not an overUtilized node";
this.overUtilizedDatanodes.add((Source)datanodeS);
overLoadedBytes += (long)((datanodeS.utilization-avg
-threshold)*datanodeS.datanode.getCapacity()/100.0);
@ -842,7 +844,7 @@ private long initNodes(DatanodeInfo[] datanodes) {
this.belowAvgUtilizedDatanodes.add(datanodeS);
} else {
assert isUnderUtilized(datanodeS) : "isUnderUtilized("
+ datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
+ datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
+ ", utilization=" + datanodeS.utilization;
this.underUtilizedDatanodes.add(datanodeS);
underLoadedBytes += (long)((avg-threshold-

View File

@ -100,11 +100,7 @@ public class DatanodeManager {
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
* </ul> <br>
* The list of the {@link DatanodeDescriptor}s in the map is checkpointed
* in the namespace image file. Only the {@link DatanodeInfo} part is
* persistent, the list of blocks is restored from the datanode block
* reports.
* </ul> <br>
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
@ -832,7 +828,9 @@ private DatanodeID parseDNFromHostsEntry(String hostLine) {
if (InetAddresses.isInetAddress(hostStr)) {
// The IP:port is sufficient for listing in a report
dnId = new DatanodeID(hostStr, "", port);
dnId = new DatanodeID(hostStr, "", "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
} else {
String ipAddr = "";
try {
@ -840,7 +838,9 @@ private DatanodeID parseDNFromHostsEntry(String hostLine) {
} catch (UnknownHostException e) {
LOG.warn("Invalid hostname " + hostStr + " in hosts file");
}
dnId = new DatanodeID(ipAddr, hostStr, port);
dnId = new DatanodeID(ipAddr, hostStr, "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
return dnId;
}

View File

@ -667,23 +667,16 @@ void startDataNode(Configuration conf,
* @param nsInfo the namespace info from the first part of the NN handshake
*/
DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
final String xferIp = streamingAddr.getAddress().getHostAddress();
DatanodeRegistration bpRegistration = new DatanodeRegistration(xferIp, getXferPort());
bpRegistration.setInfoPort(getInfoPort());
bpRegistration.setIpcPort(getIpcPort());
bpRegistration.setHostName(hostName);
bpRegistration.setStorageID(getStorageId());
bpRegistration.setSoftwareVersion(VersionInfo.getVersion());
StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
if (storageInfo == null) {
// it's null in the case of SimulatedDataSet
bpRegistration.getStorageInfo().layoutVersion = HdfsConstants.LAYOUT_VERSION;
bpRegistration.setStorageInfo(nsInfo);
} else {
bpRegistration.setStorageInfo(storageInfo);
storageInfo = new StorageInfo(nsInfo);
}
return bpRegistration;
DatanodeID dnId = new DatanodeID(
streamingAddr.getAddress().getHostAddress(), hostName,
getStorageId(), getXferPort(), getInfoPort(), getIpcPort());
return new DatanodeRegistration(dnId, storageInfo,
new ExportedBlockKeys(), VersionInfo.getVersion());
}
/**

View File

@ -106,7 +106,35 @@ public String getName() {
@Override
protected FSEditLogOp nextOp() throws IOException {
return reader.readOp(false);
FSEditLogOp op = reader.readOp(false);
if ((op != null) && (op.hasTransactionId())) {
long txId = op.getTransactionId();
if ((txId >= lastTxId) &&
(lastTxId != HdfsConstants.INVALID_TXID)) {
//
// Sometimes, the NameNode crashes while it's writing to the
// edit log. In that case, you can end up with an unfinalized edit log
// which has some garbage at the end.
// JournalManager#recoverUnfinalizedSegments will finalize these
// unfinished edit logs, giving them a defined final transaction
// ID. Then they will be renamed, so that any subsequent
// readers will have this information.
//
// Since there may be garbage at the end of these "cleaned up"
// logs, we want to be sure to skip it here if we've read everything
// we were supposed to read out of the stream.
// So we force an EOF on all subsequent reads.
//
long skipAmt = file.length() - tracker.getPos();
if (skipAmt > 0) {
FSImage.LOG.warn("skipping " + skipAmt + " bytes at the end " +
"of edit log '" + getName() + "': reached txid " + txId +
" out of " + lastTxId);
tracker.skip(skipAmt);
}
}
}
return op;
}
@Override

View File

@ -41,12 +41,13 @@
@InterfaceAudience.Private
public class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
public static final int PREALLOCATION_LENGTH = 1024 * 1024;
private File file;
private FileOutputStream fp; // file stream for storing edit logs
private FileChannel fc; // channel of the file stream for sync
private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH);
static {
fill.position(0);

View File

@ -144,7 +144,7 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
check203UpgradeFailure(logVersion, e);
String errorMessage =
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
FSImage.LOG.error(errorMessage);
FSImage.LOG.error(errorMessage, e);
if (recovery == null) {
// We will only try to skip over problematic opcodes when in
// recovery mode.
@ -730,29 +730,34 @@ public PositionTrackingInputStream(InputStream is) {
super(is);
}
@Override
public int read() throws IOException {
int ret = super.read();
if (ret != -1) curPos++;
return ret;
}
@Override
public int read(byte[] data) throws IOException {
int ret = super.read(data);
if (ret > 0) curPos += ret;
return ret;
}
@Override
public int read(byte[] data, int offset, int length) throws IOException {
int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret;
return ret;
}
@Override
public void mark(int limit) {
super.mark(limit);
markPos = curPos;
}
@Override
public void reset() throws IOException {
if (markPos == -1) {
throw new IOException("Not marked!");
@ -765,6 +770,13 @@ public void reset() throws IOException {
public long getPos() {
return curPos;
}
@Override
public long skip(long amt) throws IOException {
long ret = super.skip(amt);
curPos += ret;
return ret;
}
}
public long getLastAppliedTxId() {

View File

@ -75,6 +75,7 @@
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
private static final int MAX_OP_SIZE = 100 * 1024 * 1024;
@SuppressWarnings("deprecation")
@ -2263,31 +2264,76 @@ public Reader(DataInputStream in, int logVersion) {
*
* @param skipBrokenEdits If true, attempt to skip over damaged parts of
* the input stream, rather than throwing an IOException
* @return the operation read from the stream, or null at the end of the file
* @throws IOException on error.
* @return the operation read from the stream, or null at the end of the
* file
* @throws IOException on error. This function should only throw an
* exception when skipBrokenEdits is false.
*/
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
FSEditLogOp op = null;
while (true) {
try {
in.mark(in.available());
try {
op = decodeOp();
} finally {
// If we encountered an exception or an end-of-file condition,
// do not advance the input stream.
if (op == null) {
in.reset();
}
}
return op;
} catch (IOException e) {
in.mark(MAX_OP_SIZE);
return decodeOp();
} catch (GarbageAfterTerminatorException e) {
in.reset();
if (!skipBrokenEdits) {
throw e;
}
if (in.skip(1) < 1) {
// If we saw a terminator opcode followed by a long region of 0x00 or
// 0xff, we want to skip over that region, because there's nothing
// interesting there.
long numSkip = e.getNumAfterTerminator();
if (in.skip(numSkip) < numSkip) {
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
"garbage after an OP_INVALID. Unexpected early EOF.");
return null;
}
} catch (IOException e) {
in.reset();
if (!skipBrokenEdits) {
throw e;
}
} catch (RuntimeException e) {
// FSEditLogOp#decodeOp is not supposed to throw RuntimeException.
// However, we handle it here for recovery mode, just to be more
// robust.
in.reset();
if (!skipBrokenEdits) {
throw e;
}
} catch (Throwable e) {
in.reset();
if (!skipBrokenEdits) {
throw new IOException("got unexpected exception " +
e.getMessage(), e);
}
}
// Move ahead one byte and re-try the decode process.
if (in.skip(1) < 1) {
return null;
}
}
}
private void verifyTerminator() throws IOException {
long off = 0;
/** The end of the edit log should contain only 0x00 or 0xff bytes.
* If it contains other bytes, the log itself may be corrupt.
* It is important to check this; if we don't, a stray OP_INVALID byte
* could make us stop reading the edit log halfway through, and we'd never
* know that we had lost data.
*/
byte[] buf = new byte[4096];
while (true) {
int numRead = in.read(buf);
if (numRead == -1) {
return;
}
for (int i = 0; i < numRead; i++, off++) {
if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) {
throw new GarbageAfterTerminatorException("Read garbage after " +
"the terminator!", off);
}
}
}
}
@ -2306,8 +2352,10 @@ private FSEditLogOp decodeOp() throws IOException {
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID)
if (opCode == OP_INVALID) {
verifyTerminator();
return null;
}
FSEditLogOp op = cache.get(opCode);
if (op == null) {
@ -2477,4 +2525,35 @@ public static PermissionStatus permissionStatusFromXml(Stanza st)
short mode = Short.valueOf(st.getValue("MODE"));
return new PermissionStatus(username, groupname, new FsPermission(mode));
}
}
/**
* Exception indicating that we found an OP_INVALID followed by some
* garbage. An OP_INVALID should signify the end of the file... if there
* is additional content after that, then the edit log is corrupt.
*/
static class GarbageAfterTerminatorException extends IOException {
private static final long serialVersionUID = 1L;
private final long numAfterTerminator;
public GarbageAfterTerminatorException(String str,
long numAfterTerminator) {
super(str);
this.numAfterTerminator = numAfterTerminator;
}
/**
* Get the number of bytes after the terminator at which the garbage
* appeared.
*
* So if you had an OP_INVALID followed immediately by another valid opcode,
* this would be 0.
* If you had an OP_INVALID followed by some padding bytes, followed by a
* stray byte at the end, this would be the number of padding bytes.
*
* @return numAfterTerminator
*/
public long getNumAfterTerminator() {
return numAfterTerminator;
}
}
}

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -536,6 +537,11 @@ public FSEditLog getEditLog() {
return editLog;
}
@VisibleForTesting
void setEditLogForTesting(FSEditLog newLog) {
editLog = newLog;
}
void openEditLogForWrite() throws IOException {
assert editLog != null : "editLog must be initialized";
editLog.openForWrite();

View File

@ -197,18 +197,33 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/***************************************************
* FSNamesystem does the actual bookkeeping work for the
* DataNode.
/**
* FSNamesystem is a container of both transient
* and persisted name-space state, and does all the book-keeping
* work on a NameNode.
*
* It tracks several important tables.
* Its roles are briefly described below:
*
* 1) valid fsname --> blocklist (kept on disk, logged)
* 1) Is the container for BlockManager, DatanodeManager,
* DelegationTokens, LeaseManager, etc. services.
* 2) RPC calls that modify or inspect the name-space
* should get delegated here.
* 3) Anything that touches only blocks (eg. block reports),
* it delegates to BlockManager.
* 4) Anything that touches only file information (eg. permissions, mkdirs),
* it delegates to FSDirectory.
* 5) Anything that crosses two of the above components should be
* coordinated here.
* 6) Logs mutations to FSEditLog.
*
* This class and its contents keep:
*
* 1) Valid fsname --> blocklist (kept on disk, logged)
* 2) Set of all valid blocks (inverted #1)
* 3) block --> machinelist (kept in memory, rebuilt dynamically from reports)
* 4) machine --> blocklist (inverted #2)
* 5) LRU cache of updated-heartbeat machines
***************************************************/
*/
@InterfaceAudience.Private
@Metrics(context="dfs")
public class FSNamesystem implements Namesystem, FSClusterStats,
@ -1868,6 +1883,7 @@ LocatedBlock getAdditionalBlock(String src,
QuotaExceededException, SafeModeException, UnresolvedLinkException,
IOException {
checkBlock(previous);
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
long fileLength, blockSize;
int replication;
DatanodeDescriptor clientNode = null;
@ -1890,10 +1906,65 @@ LocatedBlock getAdditionalBlock(String src,
// have we exceeded the configured limit of fs objects.
checkFsObjectLimit();
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
// three possibilities:
// 1) This is the first block allocation of an append() pipeline
// which started appending exactly at a block boundary.
// In this case, the client isn't passed the previous block,
// so it makes the allocateBlock() call with previous=null.
// We can distinguish this since the last block of the file
// will be exactly a full block.
// 2) This is a retry from a client that missed the response of a
// prior getAdditionalBlock() call, perhaps because of a network
// timeout, or because of an HA failover. In that case, we know
// by the fact that the client is re-issuing the RPC that it
// never began to write to the old block. Hence it is safe to
// abandon it and allocate a new one.
// 3) This is an entirely bogus request/bug -- we should error out
// rather than potentially appending a new block with an empty
// one in the middle, etc
BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() == pendingFile.getPreferredBlockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.allocateBlock: handling block allocation" +
" writing to a file with a complete previous block: src=" +
src + " lastBlock=" + lastBlockInFile);
}
} else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
// Case 2
if (lastBlockInFile.getNumBytes() != 0) {
throw new IOException(
"Request looked like a retry to allocate block " +
lastBlockInFile + " but it already contains " +
lastBlockInFile.getNumBytes() + " bytes");
}
// The retry case ("b" above) -- abandon the old block.
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +
"caught retry for allocation of a new block in " +
src + ". Abandoning old block " + lastBlockInFile);
dir.removeBlock(src, pendingFile, lastBlockInFile);
dir.persistBlocks(src, pendingFile);
} else {
throw new IOException("Cannot allocate block in " + src + ": " +
"passed 'previous' block " + previous + " does not match actual " +
"last block in file " + lastBlockInFile);
}
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, ExtendedBlock.getLocalBlock(previous));
commitOrCompleteLastBlock(pendingFile, previousBlock);
//
// If we fail this, bad things happen!
@ -2104,7 +2175,29 @@ private boolean completeFileInternal(String src,
throw new SafeModeException("Cannot complete file " + src, safeMode);
}
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
INodeFileUnderConstruction pendingFile;
try {
pendingFile = checkLease(src, holder);
} catch (LeaseExpiredException lee) {
INodeFile file = dir.getFileINode(src);
if (file != null && !file.isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
Block realLastBlock = file.getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " +
"received request from " + holder + " to complete file " + src +
" which is already closed. But, it appears to be an RPC " +
"retry. Returning success.");
return true;
}
}
throw lee;
}
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, last);

View File

@ -29,6 +29,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.commons.logging.Log;
@ -49,6 +50,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
/**
* This class is used in Namesystem's jetty to retrieve a file.
@ -282,8 +284,7 @@ static String getParamStringToPutImage(long txid,
return "putimage=1" +
"&" + TXID_PARAM + "=" + txid +
"&port=" + imageListenAddress.getPort() +
"&machine=" + imageListenAddress.getHostName()
+ "&" + STORAGEINFO_PARAM + "=" +
"&" + STORAGEINFO_PARAM + "=" +
storage.toColonSeparatedString();
}
@ -310,7 +311,10 @@ public GetImageParams(HttpServletRequest request,
Map<String, String[]> pmap = request.getParameterMap();
isGetImage = isGetEdit = isPutImage = fetchLatest = false;
remoteport = 0;
machineName = null;
machineName = request.getRemoteHost();
if (InetAddresses.isInetAddress(machineName)) {
machineName = NetUtils.getHostNameOfIP(machineName);
}
for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
String key = entry.getKey();
@ -335,8 +339,6 @@ public GetImageParams(HttpServletRequest request,
txId = parseLongParam(request, TXID_PARAM);
} else if (key.equals("port")) {
remoteport = new Integer(val[0]).intValue();
} else if (key.equals("machine")) {
machineName = val[0];
} else if (key.equals(STORAGEINFO_PARAM)) {
storageInfoString = val[0];
}

View File

@ -112,7 +112,7 @@ private String getHttpAddress(Configuration conf) {
*/
private boolean checkAddress(String addrStr) {
InetSocketAddress addr = NetUtils.createSocketAddr(addrStr);
return addr.getPort() != 0 && !addr.getAddress().isAnyLocalAddress();
return addr.getPort() != 0;
}
public void start() {

View File

@ -21,9 +21,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
/** A class to implement an array of BlockLocations
* It provide efficient customized serialization/deserialization methods
* in stead of using the default array (de)serialization provided by RPC
/**
* Maintains an array of blocks and their corresponding storage IDs.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -36,12 +35,12 @@ public class BlocksWithLocations {
@InterfaceStability.Evolving
public static class BlockWithLocations {
Block block;
String datanodeIDs[];
String storageIDs[];
/** constructor */
public BlockWithLocations(Block b, String[] datanodes) {
block = b;
datanodeIDs = datanodes;
public BlockWithLocations(Block block, String[] storageIDs) {
this.block = block;
this.storageIDs = storageIDs;
}
/** get the block */
@ -50,15 +49,15 @@ public Block getBlock() {
}
/** get the block's locations */
public String[] getDatanodes() {
return datanodeIDs;
public String[] getStorageIDs() {
return storageIDs;
}
}
private BlockWithLocations[] blocks;
/** Constructor with one parameter */
public BlocksWithLocations( BlockWithLocations[] blocks ) {
public BlocksWithLocations(BlockWithLocations[] blocks) {
this.blocks = blocks;
}

View File

@ -47,21 +47,6 @@ public DatanodeRegistration(DatanodeID dn, StorageInfo info,
this.softwareVersion = softwareVersion;
}
public DatanodeRegistration(String ipAddr, int xferPort) {
this(ipAddr, xferPort, new StorageInfo(), new ExportedBlockKeys());
}
public DatanodeRegistration(String ipAddr, int xferPort, StorageInfo info,
ExportedBlockKeys keys) {
super(ipAddr, xferPort);
this.storageInfo = info;
this.exportedKeys = keys;
}
public void setStorageInfo(StorageInfo storage) {
this.storageInfo = new StorageInfo(storage);
}
public StorageInfo getStorageInfo() {
return storageInfo;
}
@ -74,10 +59,6 @@ public ExportedBlockKeys getExportedKeys() {
return exportedKeys;
}
public void setSoftwareVersion(String softwareVersion) {
this.softwareVersion = softwareVersion;
}
public String getSoftwareVersion() {
return softwareVersion;
}

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@ -48,7 +49,8 @@ static OfflineEditsLoader createLoader(OfflineEditsVisitor visitor,
OfflineEditsLoader loader = null;
try {
file = new File(inputFileName);
elis = new EditLogFileInputStream(file, -1, -1, false);
elis = new EditLogFileInputStream(file, HdfsConstants.INVALID_TXID,
HdfsConstants.INVALID_TXID, false);
loader = new OfflineEditsBinaryLoader(visitor, elis);
} finally {
if ((loader == null) && (elis != null)) {

View File

@ -274,7 +274,7 @@ message BlockProto {
*/
message BlockWithLocationsProto {
required BlockProto block = 1; // Block
repeated string datanodeIDs = 2; // Datanodes with replicas of the block
repeated string storageIDs = 2; // Datanodes with replicas of the block
}
/**

View File

@ -67,19 +67,23 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
import com.google.common.base.Joiner;
@ -708,13 +712,14 @@ public static void setFederatedConfiguration(MiniDFSCluster cluster,
}
private static DatanodeID getDatanodeID(String ipAddr) {
return new DatanodeID(ipAddr, "localhost",
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT);
return new DatanodeID(ipAddr, "localhost", "",
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
}
public static DatanodeID getLocalDatanodeID() {
return new DatanodeID("127.0.0.1", "localhost",
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT);
return getDatanodeID("127.0.0.1");
}
public static DatanodeID getLocalDatanodeID(int port) {
@ -740,12 +745,14 @@ public static DatanodeInfo getLocalDatanodeInfo(int port) {
public static DatanodeInfo getDatanodeInfo(String ipAddr,
String host, int port) {
return new DatanodeInfo(new DatanodeID(ipAddr, host, port));
return new DatanodeInfo(new DatanodeID(ipAddr, host, "",
port, DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT));
}
public static DatanodeInfo getLocalDatanodeInfo(String ipAddr,
String hostname, AdminStates adminState) {
return new DatanodeInfo(ipAddr, hostname, "storage",
return new DatanodeInfo(ipAddr, hostname, "",
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
@ -760,6 +767,14 @@ public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
int port, String rackLocation) {
return new DatanodeDescriptor(new DatanodeID(ipAddr, port), rackLocation);
DatanodeID dnId = new DatanodeID(ipAddr, "host", "", port,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
return new DatanodeDescriptor(dnId, rackLocation);
}
public static DatanodeRegistration getLocalDatanodeRegistration() {
return new DatanodeRegistration(getLocalDatanodeID(),
new StorageInfo(), new ExportedBlockKeys(), VersionInfo.getVersion());
}
}

View File

@ -1253,6 +1253,13 @@ public int getNameNodePort() {
public int getNameNodePort(int nnIndex) {
return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
}
/**
* @return the service rpc port used by the NameNode at the given index.
*/
public int getNameNodeServicePort(int nnIndex) {
return nameNodes[nnIndex].nameNode.getServiceRpcAddress().getPort();
}
/**
* Shutdown all the nodes in the cluster.
@ -1653,20 +1660,15 @@ public Collection<URI> getNameEditsDirs(int nnIndex) throws IOException {
return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
}
private HAServiceProtocol getHaServiceClient(int nnIndex) throws IOException {
InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
return new HAServiceProtocolClientSideTranslatorPB(addr, conf);
}
public void transitionToActive(int nnIndex) throws IOException,
ServiceFailedException {
HAServiceProtocolHelper.transitionToActive(getHaServiceClient(nnIndex),
getNameNode(nnIndex).getRpcServer().transitionToActive(
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
}
public void transitionToStandby(int nnIndex) throws IOException,
ServiceFailedException {
HAServiceProtocolHelper.transitionToStandby(getHaServiceClient(nnIndex),
getNameNode(nnIndex).getRpcServer().transitionToStandby(
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
}

View File

@ -65,10 +65,13 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
/**
* These tests make sure that DFSClient retries fetching data from DFS
* properly in case of errors.
@ -298,6 +301,100 @@ public void testFailuresArePerOperation() throws Exception
cluster.shutdown();
}
}
/**
* Test that getAdditionalBlock() and close() are idempotent. This allows
* a client to safely retry a call and still produce a correct
* file. See HDFS-3031.
*/
public void testIdempotentAllocateBlockAndClose() throws Exception {
final String src = "/testIdempotentAllocateBlock";
Path file = new Path(src);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Make the call to addBlock() get called twice, as if it were retried
// due to an IPC issue.
doAnswer(new Answer<LocatedBlock>() {
@Override
public LocatedBlock answer(InvocationOnMock invocation) throws Throwable {
LocatedBlock ret = (LocatedBlock) invocation.callRealMethod();
LocatedBlocks lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE);
int blockCount = lb.getLocatedBlocks().size();
assertEquals(lb.getLastLocatedBlock().getBlock(), ret.getBlock());
// Retrying should result in a new block at the end of the file.
// (abandoning the old one)
LocatedBlock ret2 = (LocatedBlock) invocation.callRealMethod();
lb = cluster.getNameNodeRpc().getBlockLocations(src, 0, Long.MAX_VALUE);
int blockCount2 = lb.getLocatedBlocks().size();
assertEquals(lb.getLastLocatedBlock().getBlock(), ret2.getBlock());
// We shouldn't have gained an extra block by the RPC.
assertEquals(blockCount, blockCount2);
return (LocatedBlock) ret2;
}
}).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
// complete() may return false a few times before it returns
// true. We want to wait until it returns true, and then
// make it retry one more time after that.
LOG.info("Called complete(: " +
Joiner.on(",").join(invocation.getArguments()) + ")");
if (!(Boolean)invocation.callRealMethod()) {
LOG.info("Complete call returned false, not faking a retry RPC");
return false;
}
// We got a successful close. Call it again to check idempotence.
try {
boolean ret = (Boolean) invocation.callRealMethod();
LOG.info("Complete call returned true, faked second RPC. " +
"Returned: " + ret);
return ret;
} catch (Throwable t) {
LOG.error("Idempotent retry threw exception", t);
throw t;
}
}
}).when(spyNN).complete(Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any());
OutputStream stm = client.create(file.toString(), true);
try {
AppendTestUtil.write(stm, 0, 10000);
stm.close();
stm = null;
} finally {
IOUtils.cleanup(LOG, stm);
}
// Make sure the mock was actually properly injected.
Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any(), Mockito.<DatanodeInfo[]>any());
Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
Mockito.anyString(), Mockito.anyString(),
Mockito.<ExtendedBlock>any());
AppendTestUtil.check(fs, file, 10000);
} finally {
cluster.shutdown();
}
}
/**
* Mock Answer implementation of NN.getBlockLocations that will return

View File

@ -533,29 +533,73 @@ public void testSubstituteForWildcardAddress() throws IOException {
public void testGetNNUris() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
final String NS1_NN1_HOST = "ns1-nn1.example.com:8020";
final String NS1_NN2_HOST = "ns1-nn1.example.com:8020";
final String NS2_NN_HOST = "ns2-nn.example.com:8020";
final String NN_HOST = "nn.example.com:8020";
final String NS1_NN1_ADDR = "ns1-nn1.example.com:8020";
final String NS1_NN2_ADDR = "ns1-nn2.example.com:8020";
final String NS2_NN_ADDR = "ns2-nn.example.com:8020";
final String NN1_ADDR = "nn.example.com:8020";
final String NN1_SRVC_ADDR = "nn.example.com:8021";
final String NN2_ADDR = "nn2.example.com:8020";
conf.set(DFS_FEDERATION_NAMESERVICES, "ns1,ns2");
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),"nn1,nn2");
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_HOST);
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), NS1_NN1_ADDR);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST);
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_ADDR);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "ns2"),
NS2_NN_HOST);
NS2_NN_ADDR);
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN_HOST);
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "hdfs://" + NN1_ADDR);
Collection<URI> uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN2_ADDR);
Collection<URI> uris = DFSUtil.getNameServiceUris(conf,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(4, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
assertTrue(uris.contains(new URI("hdfs://" + NN1_ADDR)));
assertTrue(uris.contains(new URI("hdfs://" + NN2_ADDR)));
// Make sure that non-HDFS URIs in fs.defaultFS don't get included.
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
"viewfs://vfs-name.example.com");
uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(3, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_HOST)));
assertTrue(uris.contains(new URI("hdfs://" + NN_HOST)));
assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
assertTrue(uris.contains(new URI("hdfs://" + NN1_ADDR)));
// Make sure that an HA URI being the default URI doesn't result in multiple
// entries being returned.
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(3, uris.size());
assertTrue(uris.contains(new URI("hdfs://ns1")));
assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_ADDR)));
assertTrue(uris.contains(new URI("hdfs://" + NN1_ADDR)));
// Make sure that when a service RPC address is used that is distinct from
// the client RPC address, and that client RPC address is also used as the
// default URI, that the client URI does not end up in the set of URIs
// returned.
conf = new HdfsConfiguration();
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + NN1_ADDR);
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, NN1_ADDR);
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, NN1_SRVC_ADDR);
uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_ADDRESS_KEY);
assertEquals(1, uris.size());
assertTrue(uris.contains(new URI("hdfs://" + NN1_SRVC_ADDR)));
}
}

View File

@ -39,8 +39,7 @@
import org.apache.log4j.Level;
/**
* This class tests that a file need not be closed before its
* data can be read by another client.
* This class tests that pipelines survive data node death and recovery.
*/
public class TestDatanodeDeath extends TestCase {
{

View File

@ -38,8 +38,7 @@
import org.junit.Test;
/**
* This class tests that a file need not be closed before its
* data can be read by another client.
* This class tests data node registration.
*/
public class TestDatanodeRegistration {

View File

@ -21,10 +21,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
@ -43,9 +39,13 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 extends junit.framework.TestCase {
public class TestFileAppend3 {
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
@ -64,29 +64,28 @@ public class TestFileAppend3 extends junit.framework.TestCase {
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
public static Test suite() {
return new TestSetup(new TestSuite(TestFileAppend3.class)) {
protected void setUp() throws java.lang.Exception {
AppendTestUtil.LOG.info("setUp()");
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
fs = (DistributedFileSystem)cluster.getFileSystem();
}
protected void tearDown() throws Exception {
AppendTestUtil.LOG.info("tearDown()");
if(fs != null) fs.close();
if(cluster != null) cluster.shutdown();
}
};
@BeforeClass
public static void setUp() throws java.lang.Exception {
AppendTestUtil.LOG.info("setUp()");
conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 512);
buffersize = conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build();
fs = (DistributedFileSystem)cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws Exception {
AppendTestUtil.LOG.info("tearDown()");
if(fs != null) fs.close();
if(cluster != null) cluster.shutdown();
}
/**
* TC1: Append on block boundary.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC1() throws Exception {
final Path p = new Path("/TC1/foo");
System.out.println("p=" + p);
@ -115,6 +114,7 @@ public void testTC1() throws Exception {
* TC2: Append on non-block boundary.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC2() throws Exception {
final Path p = new Path("/TC2/foo");
System.out.println("p=" + p);
@ -145,6 +145,7 @@ public void testTC2() throws Exception {
* TC5: Only one simultaneous append.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC5() throws Exception {
final Path p = new Path("/TC5/foo");
System.out.println("p=" + p);
@ -175,6 +176,7 @@ public void testTC5() throws Exception {
* TC7: Corrupted replicas are present.
* @throws IOException an exception might be thrown
*/
@Test
public void testTC7() throws Exception {
final short repl = 2;
final Path p = new Path("/TC7/foo");
@ -224,6 +226,7 @@ public void testTC7() throws Exception {
* TC11: Racing rename
* @throws IOException an exception might be thrown
*/
@Test
public void testTC11() throws Exception {
final Path p = new Path("/TC11/foo");
System.out.println("p=" + p);
@ -282,6 +285,7 @@ public void testTC11() throws Exception {
* TC12: Append to partial CRC chunk
* @throws IOException an exception might be thrown
*/
@Test
public void testTC12() throws Exception {
final Path p = new Path("/TC12/foo");
System.out.println("p=" + p);
@ -313,6 +317,7 @@ public void testTC12() throws Exception {
* *
* @throws IOException
*/
@Test
public void testAppendToPartialChunk() throws IOException {
final Path p = new Path("/partialChunk/foo");
final int fileLen = 513;

View File

@ -31,11 +31,13 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* This class tests that a file need not be closed before its
* data can be read by another client.
* This class tests client lease recovery.
*/
public class TestFileCreationClient extends junit.framework.TestCase {
public class TestFileCreationClient {
static final String DIR = "/" + TestFileCreationClient.class.getSimpleName() + "/";
{
@ -46,6 +48,7 @@ public class TestFileCreationClient extends junit.framework.TestCase {
}
/** Test lease recovery Triggered by DFSClient. */
@Test
public void testClientTriggeredLeaseRecovery() throws Exception {
final int REPLICATION = 3;
Configuration conf = new HdfsConfiguration();

View File

@ -101,18 +101,18 @@ public void testGetBlocks() throws Exception {
BlockWithLocations[] locs;
locs = namenode.getBlocks(dataNodes[0], fileLen).getBlocks();
assertEquals(locs.length, 2);
assertEquals(locs[0].getDatanodes().length, 2);
assertEquals(locs[1].getDatanodes().length, 2);
assertEquals(locs[0].getStorageIDs().length, 2);
assertEquals(locs[1].getStorageIDs().length, 2);
// get blocks of size BlockSize from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], DEFAULT_BLOCK_SIZE).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getDatanodes().length, 2);
assertEquals(locs[0].getStorageIDs().length, 2);
// get blocks of size 1 from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], 1).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getDatanodes().length, 2);
assertEquals(locs[0].getStorageIDs().length, 2);
// get blocks of size 0 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], 0);

View File

@ -38,8 +38,7 @@
import org.junit.Test;
/**
* This class tests that a file need not be closed before its
* data can be read by another client.
* This class tests that data nodes are correctly replaced on failure.
*/
public class TestReplaceDatanodeOnFailure {
static final Log LOG = AppendTestUtil.LOG;

View File

@ -161,7 +161,7 @@ private static BlockWithLocations getBlockWithLocations(int bid) {
private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
assertEquals(locs1.getBlock(), locs2.getBlock());
assertTrue(Arrays.equals(locs1.getDatanodes(), locs2.getDatanodes()));
assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs()));
}
@Test

View File

@ -29,6 +29,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -115,7 +116,7 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
0, HdfsConstants.LAYOUT_VERSION))
.when(mock).versionRequest();
Mockito.doReturn(new DatanodeRegistration("1.2.3.4", 100))
Mockito.doReturn(DFSTestUtil.getLocalDatanodeRegistration())
.when(mock).registerDatanode(Mockito.any(DatanodeRegistration.class));
Mockito.doAnswer(new HeartbeatAnswer(nnIdx))

View File

@ -35,10 +35,12 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@ -765,6 +767,7 @@ private static class TinyDatanode implements Comparable<String> {
ArrayList<Block> blocks;
int nrBlocks; // actual number of blocks
long[] blockReportList;
int dnIdx;
/**
* Return a a 6 digit integer port.
@ -780,11 +783,7 @@ private static int getNodePort(int num) throws IOException {
}
TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
String ipAddr = DNS.getDefaultIP("default");
String hostName = DNS.getDefaultHost("default", "default");
dnRegistration = new DatanodeRegistration(ipAddr, getNodePort(dnIdx));
dnRegistration.setHostName(hostName);
dnRegistration.setSoftwareVersion(VersionInfo.getVersion());
this.dnIdx = dnIdx;
this.blocks = new ArrayList<Block>(blockCapacity);
this.nrBlocks = 0;
}
@ -800,7 +799,14 @@ String getXferAddr() {
void register() throws IOException {
// get versions from the namenode
nsInfo = nameNodeProto.versionRequest();
dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
dnRegistration = new DatanodeRegistration(
new DatanodeID(DNS.getDefaultIP("default"),
DNS.getDefaultHost("default", "default"),
"", getNodePort(dnIdx),
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT),
new DataStorage(nsInfo, ""),
new ExportedBlockKeys(), VersionInfo.getVersion());
DataNode.setNewStorageID(dnRegistration);
// register datanode
dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
@ -896,12 +902,9 @@ private int transferBlocks( Block blocks[],
for(int t = 0; t < blockTargets.length; t++) {
DatanodeInfo dnInfo = blockTargets[t];
DatanodeRegistration receivedDNReg;
receivedDNReg =
new DatanodeRegistration(dnInfo.getIpAddr(), dnInfo.getXferPort());
receivedDNReg.setStorageInfo(
new DataStorage(nsInfo, dnInfo.getStorageID()));
receivedDNReg.setInfoPort(dnInfo.getInfoPort());
receivedDNReg.setIpcPort(dnInfo.getIpcPort());
receivedDNReg = new DatanodeRegistration(dnInfo,
new DataStorage(nsInfo, dnInfo.getStorageID()),
new ExportedBlockKeys(), VersionInfo.getVersion());
ReceivedDeletedBlockInfo[] rdBlocks = {
new ReceivedDeletedBlockInfo(
blocks[i], ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,

View File

@ -527,7 +527,7 @@ public void testEditChecksum() throws Exception {
} catch (IOException e) {
// expected
assertEquals("Cause of exception should be ChecksumException",
e.getCause().getClass(), ChecksumException.class);
ChecksumException.class, e.getCause().getClass());
}
}

View File

@ -68,7 +68,7 @@ public void testPreallocation() throws IOException {
assertEquals(1, validation.getNumTransactions());
assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " +
"for the version number",
PREALLOCATION_LENGTH, editLog.length());
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
cluster.getFileSystem().mkdirs(new Path("/tmp"),
@ -82,7 +82,7 @@ public void testPreallocation() throws IOException {
assertEquals(2, validation.getNumTransactions());
assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number",
PREALLOCATION_LENGTH, editLog.length());
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
// 256 blocks for the 1MB of preallocation space
assertTrue("Edit log disk space used should be at least 257 blocks",
256 * 4096 <= new DU(editLog, conf).getUsed());

View File

@ -25,6 +25,8 @@
import java.util.Set;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,7 +39,6 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -213,15 +214,129 @@ public Set<Long> getValidTxIds() {
public void testSkipEdit() throws IOException {
runEditLogTest(new EltsTestGarbageInEditLog());
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been truncated. */
@Test(timeout=180000)
public void testRecoverTruncatedEditLog() throws IOException {
/**
* An algorithm for corrupting an edit log.
*/
static interface Corruptor {
/*
* Corrupt an edit log file.
*
* @param editFile The edit log file
*/
public void corrupt(File editFile) throws IOException;
/*
* Explain whether we need to read the log in recovery mode
*
* @param finalized True if the edit log in question is finalized.
* We're a little more lax about reading unfinalized
* logs. We will allow a small amount of garbage at
* the end. In a finalized log, every byte must be
* perfect.
*
* @return Whether we need to read the log in recovery mode
*/
public boolean needRecovery(boolean finalized);
/*
* Get the name of this corruptor
*
* @return The Corruptor name
*/
public String getName();
}
static class TruncatingCorruptor implements Corruptor {
@Override
public void corrupt(File editFile) throws IOException {
// Corrupt the last edit
long fileLen = editFile.length();
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.setLength(fileLen - 1);
rwf.close();
}
@Override
public boolean needRecovery(boolean finalized) {
return finalized;
}
@Override
public String getName() {
return "truncated";
}
}
static class PaddingCorruptor implements Corruptor {
@Override
public void corrupt(File editFile) throws IOException {
// Add junk to the end of the file
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.seek(editFile.length());
for (int i = 0; i < 129; i++) {
rwf.write((byte)0);
}
rwf.write(0xd);
rwf.write(0xe);
rwf.write(0xa);
rwf.write(0xd);
rwf.close();
}
@Override
public boolean needRecovery(boolean finalized) {
// With finalized edit logs, we ignore what's at the end as long as we
// can make it to the correct transaction ID.
// With unfinalized edit logs, the finalization process ignores garbage
// at the end.
return false;
}
@Override
public String getName() {
return "padFatal";
}
}
static class SafePaddingCorruptor implements Corruptor {
private byte padByte;
public SafePaddingCorruptor(byte padByte) {
this.padByte = padByte;
assert ((this.padByte == 0) || (this.padByte == -1));
}
@Override
public void corrupt(File editFile) throws IOException {
// Add junk to the end of the file
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.seek(editFile.length());
rwf.write((byte)-1);
for (int i = 0; i < 1024; i++) {
rwf.write(padByte);
}
rwf.close();
}
@Override
public boolean needRecovery(boolean finalized) {
return false;
}
@Override
public String getName() {
return "pad" + ((int)padByte);
}
}
static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
throws IOException {
final String TEST_PATH = "/test/path/dir";
final int NUM_TEST_MKDIRS = 10;
// start a cluster
final boolean needRecovery = corruptor.needRecovery(finalize);
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
@ -230,6 +345,15 @@ public void testRecoverTruncatedEditLog() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
cluster.waitActive();
if (!finalize) {
// Normally, the in-progress edit log would be finalized by
// FSEditLog#endCurrentLogSegment. For testing purposes, we
// disable that here.
FSEditLog spyLog =
spy(cluster.getNameNode().getFSImage().getEditLog());
doNothing().when(spyLog).endCurrentLogSegment(true);
cluster.getNameNode().getFSImage().setEditLogForTesting(spyLog);
}
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
FSImage fsimage = namesystem.getFSImage();
@ -246,13 +370,11 @@ public void testRecoverTruncatedEditLog() throws IOException {
File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile();
assertTrue("Should exist: " + editFile, editFile.exists());
// Corrupt the last edit
long fileLen = editFile.length();
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.setLength(fileLen - 1);
rwf.close();
// Make sure that we can't start the cluster normally before recovery
// Corrupt the edit log
corruptor.corrupt(editFile);
// If needRecovery == true, make sure that we can't start the
// cluster normally before recovery
cluster = null;
try {
LOG.debug("trying to start normally (this should fail)...");
@ -260,16 +382,24 @@ public void testRecoverTruncatedEditLog() throws IOException {
.format(false).build();
cluster.waitActive();
cluster.shutdown();
fail("expected the truncated edit log to prevent normal startup");
if (needRecovery) {
fail("expected the corrupted edit log to prevent normal startup");
}
} catch (IOException e) {
// success
if (!needRecovery) {
LOG.error("Got unexpected failure with " + corruptor.getName() +
corruptor, e);
fail("got unexpected exception " + e.getMessage());
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
// Perform recovery
// Perform NameNode recovery.
// Even if there was nothing wrong previously (needRecovery == false),
// this should still work fine.
cluster = null;
try {
LOG.debug("running recovery...");
@ -277,22 +407,22 @@ public void testRecoverTruncatedEditLog() throws IOException {
.format(false).startupOption(recoverStartOpt).build();
} catch (IOException e) {
fail("caught IOException while trying to recover. " +
"message was " + e.getMessage() +
"message was " + e.getMessage() +
"\nstack trace\n" + StringUtils.stringifyException(e));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
// Make sure that we can start the cluster normally after recovery
cluster = null;
try {
LOG.debug("starting cluster normally after recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build();
LOG.debug("testRecoverTruncatedEditLog: successfully recovered the " +
"truncated edit log");
LOG.debug("successfully recovered the " + corruptor.getName() +
" corrupted edit log");
assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH)));
} catch (IOException e) {
fail("failed to recover. Error message: " + e.getMessage());
@ -302,4 +432,36 @@ public void testRecoverTruncatedEditLog() throws IOException {
}
}
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been truncated. */
@Test(timeout=180000)
public void testRecoverTruncatedEditLog() throws IOException {
testNameNodeRecoveryImpl(new TruncatingCorruptor(), true);
testNameNodeRecoveryImpl(new TruncatingCorruptor(), false);
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been padded with garbage. */
@Test(timeout=180000)
public void testRecoverPaddedEditLog() throws IOException {
testNameNodeRecoveryImpl(new PaddingCorruptor(), true);
testNameNodeRecoveryImpl(new PaddingCorruptor(), false);
}
/** Test that don't need to recover from a situation where the last
* entry in the edit log has been padded with 0. */
@Test(timeout=180000)
public void testRecoverZeroPaddedEditLog() throws IOException {
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)0), true);
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)0), false);
}
/** Test that don't need to recover from a situation where the last
* entry in the edit log has been padded with 0xff bytes. */
@Test(timeout=180000)
public void testRecoverNegativeOnePaddedEditLog() throws IOException {
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)-1), true);
testNameNodeRecoveryImpl(new SafePaddingCorruptor((byte)-1), false);
}
}

View File

@ -256,16 +256,21 @@ public void testFailureToReadEditsOnTransitionToActive() throws Exception {
// Shutdown the active NN.
cluster.shutdownNameNode(0);
Runtime mockRuntime = mock(Runtime.class);
cluster.getNameNode(1).setRuntimeForTesting(mockRuntime);
verify(mockRuntime, times(0)).exit(anyInt());
try {
// Transition the standby to active.
cluster.transitionToActive(1);
fail("Standby transitioned to active, but should not have been able to");
} catch (ServiceFailedException sfe) {
LOG.info("got expected exception: " + sfe.toString(), sfe);
Throwable sfeCause = sfe.getCause();
LOG.info("got expected exception: " + sfeCause.toString(), sfeCause);
assertTrue("Standby failed to catch up for some reason other than "
+ "failure to read logs", sfe.toString().contains(
+ "failure to read logs", sfeCause.getCause().toString().contains(
EditLogInputException.class.getName()));
}
verify(mockRuntime, times(1)).exit(anyInt());
}
private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException {

View File

@ -85,12 +85,52 @@ public class TestPipelinesFailover {
private static final int STRESS_NUM_THREADS = 25;
private static final int STRESS_RUNTIME = 40000;
enum TestScenario {
GRACEFUL_FAILOVER {
void run(MiniDFSCluster cluster) throws IOException {
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
}
},
ORIGINAL_ACTIVE_CRASHED {
void run(MiniDFSCluster cluster) throws IOException {
cluster.restartNameNode(0);
cluster.transitionToActive(1);
}
};
abstract void run(MiniDFSCluster cluster) throws IOException;
}
enum MethodToTestIdempotence {
ALLOCATE_BLOCK,
COMPLETE_FILE;
}
/**
* Tests continuing a write pipeline over a failover.
*/
@Test(timeout=30000)
public void testWriteOverFailover() throws Exception {
public void testWriteOverGracefulFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.GRACEFUL_FAILOVER,
MethodToTestIdempotence.ALLOCATE_BLOCK);
}
@Test(timeout=30000)
public void testAllocateBlockAfterCrashFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
MethodToTestIdempotence.ALLOCATE_BLOCK);
}
@Test(timeout=30000)
public void testCompleteFileAfterCrashFailover() throws Exception {
doWriteOverFailoverTest(TestScenario.ORIGINAL_ACTIVE_CRASHED,
MethodToTestIdempotence.COMPLETE_FILE);
}
private void doWriteOverFailoverTest(TestScenario scenario,
MethodToTestIdempotence methodToTest) throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
// Don't check replication periodically.
@ -102,6 +142,8 @@ public void testWriteOverFailover() throws Exception {
.numDataNodes(3)
.build();
try {
int sizeWritten = 0;
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
@ -112,28 +154,39 @@ public void testWriteOverFailover() throws Exception {
// write a block and a half
AppendTestUtil.write(stm, 0, BLOCK_AND_A_HALF);
sizeWritten += BLOCK_AND_A_HALF;
// Make sure all of the blocks are written out before failover.
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
scenario.run(cluster);
assertTrue(fs.exists(TEST_PATH));
// NOTE: explicitly do *not* make any further metadata calls
// to the NN here. The next IPC call should be to allocate the next
// block. Any other call would notice the failover and not test
// idempotence of the operation (HDFS-3031)
FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
BlockManagerTestUtil.updateState(ns1.getBlockManager());
assertEquals(0, ns1.getPendingReplicationBlocks());
assertEquals(0, ns1.getCorruptReplicaBlocks());
assertEquals(0, ns1.getMissingBlocksCount());
// write another block and a half
AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
// If we're testing allocateBlock()'s idempotence, write another
// block and a half, so we have to allocate a new block.
// Otherise, don't write anything, so our next RPC will be
// completeFile() if we're testing idempotence of that operation.
if (methodToTest == MethodToTestIdempotence.ALLOCATE_BLOCK) {
// write another block and a half
AppendTestUtil.write(stm, sizeWritten, BLOCK_AND_A_HALF);
sizeWritten += BLOCK_AND_A_HALF;
}
stm.close();
stm = null;
AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE * 3);
AppendTestUtil.check(fs, TEST_PATH, sizeWritten);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
@ -146,7 +199,18 @@ public void testWriteOverFailover() throws Exception {
* even when the pipeline was constructed on a different NN.
*/
@Test(timeout=30000)
public void testWriteOverFailoverWithDnFail() throws Exception {
public void testWriteOverGracefulFailoverWithDnFail() throws Exception {
doTestWriteOverFailoverWithDnFail(TestScenario.GRACEFUL_FAILOVER);
}
@Test(timeout=30000)
public void testWriteOverCrashFailoverWithDnFail() throws Exception {
doTestWriteOverFailoverWithDnFail(TestScenario.ORIGINAL_ACTIVE_CRASHED);
}
private void doTestWriteOverFailoverWithDnFail(TestScenario scenario)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@ -171,8 +235,7 @@ public void testWriteOverFailoverWithDnFail() throws Exception {
stm.hflush();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
scenario.run(cluster);
assertTrue(fs.exists(TEST_PATH));
@ -183,8 +246,8 @@ public void testWriteOverFailoverWithDnFail() throws Exception {
stm.hflush();
LOG.info("Failing back to NN 0");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
cluster.stopDataNode(1);

View File

@ -67,8 +67,8 @@ public void testFailureToTransitionCausesShutdown() throws IOException {
fail("Transitioned to active but should not have been able to.");
} catch (ServiceFailedException sfe) {
assertExceptionContains("Error encountered requiring NN shutdown. " +
"Shutting down immediately.", sfe);
LOG.info("got expected exception", sfe);
"Shutting down immediately.", sfe.getCause());
LOG.info("got expected exception", sfe.getCause());
}
verify(mockRuntime, times(1)).exit(anyInt());
} finally {

View File

@ -109,7 +109,25 @@ Trunk (unreleased changes)
MAPREDUCE-1740. NPE in getMatchingLevelForNodes when node locations are
variable depth (ahmed via tucu) [IMPORTANT: this is dead code in trunk]
Release 2.0.0 - UNRELEASED
Release 2.0.1-alpha - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
MAPREDUCE-4148. MapReduce should not have a compile-time dependency on
HDFS. (tomwhite)
MAPREDUCE-4250. hadoop-config.sh missing variable exports, causes Yarn
jobs to fail with ClassNotFoundException MRAppMaster. (phunt via tucu)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES
@ -287,9 +305,6 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-4231. Update RAID to use the new BlockCollection interface.
(szetszwo)
MAPREDUCE-4148. MapReduce should not have a compile-time dependency on
HDFS. (tomwhite)
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES
@ -483,6 +498,11 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4233. NPE can happen in RMNMNodeInfo. (bobby)
MAPREDUCE-4238. mavenize data_join. (tgraves)
MAPREDUCE-4102. job counters not available in Jobhistory webui for
killed jobs (Bhallamudi Venkata Siva Kamesh via tgraves)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -69,7 +69,7 @@ public class CountersBlock extends HtmlBlock {
return;
}
if(total == null || total.getGroupNames() == null) {
if(total == null || total.getGroupNames() == null || total.countCounters() == 0) {
String type = $(TASK_ID);
if(type == null || type.isEmpty()) {
type = $(JOB_ID, "the job");
@ -180,14 +180,25 @@ private void getCounters(AppContext ctx) {
// Get all types of counters
Map<TaskId, Task> tasks = job.getTasks();
total = job.getAllCounters();
boolean needTotalCounters = false;
if (total == null) {
total = new Counters();
needTotalCounters = true;
}
map = new Counters();
reduce = new Counters();
for (Task t : tasks.values()) {
Counters counters = t.getCounters();
if (counters == null) {
continue;
}
switch (t.getType()) {
case MAP: map.incrAllCounters(counters); break;
case REDUCE: reduce.incrAllCounters(counters); break;
}
if (needTotalCounters) {
total.incrAllCounters(counters);
}
}
}

View File

@ -81,6 +81,9 @@ private void getCounters(AppContext ctx, Job job) {
Map<TaskId, Task> tasks = job.getTasks();
for (Task t : tasks.values()) {
Counters counters = t.getCounters();
if (counters == null) {
continue;
}
total.incrAllCounters(counters);
switch (t.getType()) {
case MAP:

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -131,6 +132,17 @@ public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
}
return map;
}
public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks) {
Map<JobId, Job> map = Maps.newHashMap();
for (int j = 0; j < numJobsPerApp; ++j) {
Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask, null,
hasFailedTasks);
map.put(job.getID(), job);
}
return map;
}
public static JobId newJobID(ApplicationId appID, int i) {
JobId id = Records.newRecord(JobId.class);
@ -316,16 +328,16 @@ public String getNodeRackName() {
};
}
public static Map<TaskId, Task> newTasks(JobId jid, int n, int m) {
public static Map<TaskId, Task> newTasks(JobId jid, int n, int m, boolean hasFailedTasks) {
Map<TaskId, Task> map = Maps.newHashMap();
for (int i = 0; i < n; ++i) {
Task task = newTask(jid, i, m);
Task task = newTask(jid, i, m, hasFailedTasks);
map.put(task.getID(), task);
}
return map;
}
public static Task newTask(JobId jid, int i, int m) {
public static Task newTask(JobId jid, int i, int m, final boolean hasFailedTasks) {
final TaskId tid = Records.newRecord(TaskId.class);
tid.setJobId(jid);
tid.setId(i);
@ -345,6 +357,9 @@ public TaskReport getReport() {
@Override
public Counters getCounters() {
if (hasFailedTasks) {
return null;
}
return new Counters(
TypeConverter.fromYarn(report.getCounters()));
}
@ -394,8 +409,14 @@ public TaskState getState() {
public static Counters getCounters(
Collection<Task> tasks) {
List<Task> completedTasks = new ArrayList<Task>();
for (Task task : tasks) {
if (task.getCounters() != null) {
completedTasks.add(task);
}
}
Counters counters = new Counters();
return JobImpl.incrTaskCounters(counters, tasks);
return JobImpl.incrTaskCounters(counters, completedTasks);
}
static class TaskCount {
@ -434,10 +455,15 @@ public static Job newJob(ApplicationId appID, int i, int n, int m) {
}
public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile) {
return newJob(appID, i, n, m, confFile, false);
}
public static Job newJob(ApplicationId appID, int i, int n, int m,
Path confFile, boolean hasFailedTasks) {
final JobId id = newJobID(appID, i);
final String name = newJobName();
final JobReport report = newJobReport(id);
final Map<TaskId, Task> tasks = newTasks(id, n, m);
final Map<TaskId, Task> tasks = newTasks(id, n, m, hasFailedTasks);
final TaskCount taskCount = getTaskCount(tasks.values());
final Counters counters = getCounters(tasks
.values());

View File

@ -43,6 +43,14 @@ public static JobsPair newHistoryJobs(ApplicationId appID, int numJobsPerApp,
numAttemptsPerTask);
return split(mocked);
}
public static JobsPair newHistoryJobs(ApplicationId appID, int numJobsPerApp,
int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks)
throws IOException {
Map<JobId, Job> mocked = newJobs(appID, numJobsPerApp, numTasksPerJob,
numAttemptsPerTask, hasFailedTasks);
return split(mocked);
}
private static JobsPair split(Map<JobId, Job> mocked) throws IOException {
JobsPair ret = new JobsPair();

View File

@ -63,10 +63,16 @@ static class TestAppContext implements AppContext {
final Map<JobId, Job> jobs;
final long startTime = System.currentTimeMillis();
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts,
boolean hasFailedTasks) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts,
hasFailedTasks);
}
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
this(appid, numJobs, numTasks, numAttempts, false);
}
TestAppContext() {
@ -198,6 +204,14 @@ public void testTaskView() {
appContext, params);
}
@Test public void testJobCounterViewForKilledJob() {
LOG.info("JobCounterViewForKilledJob");
AppContext appContext = new TestAppContext(0, 1, 1, 1, true);
Map<String, String> params = TestAMWebApp.getJobParams(appContext);
WebAppTests.testPage(HsCountersPage.class, AppContext.class,
appContext, params);
}
@Test public void testSingleCounterView() {
LOG.info("HsSingleCounterPage");
WebAppTests.testPage(HsSingleCounterPage.class, AppContext.class,

View File

@ -101,13 +101,15 @@ static class TestAppContext implements HistoryContext {
final Map<JobId, Job> partialJobs;
final Map<JobId, Job> fullJobs;
final long startTime = System.currentTimeMillis();
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts,
boolean hasFailedTasks) {
appID = MockJobs.newAppID(appid);
appAttemptID = MockJobs.newAppAttemptID(appID, 0);
JobsPair jobs;
try {
jobs = MockHistoryJobs.newHistoryJobs(appID, numJobs, numTasks, numAttempts);
jobs = MockHistoryJobs.newHistoryJobs(appID, numJobs, numTasks,
numAttempts, hasFailedTasks);
} catch (IOException e) {
throw new YarnException(e);
}
@ -115,6 +117,10 @@ static class TestAppContext implements HistoryContext {
fullJobs = jobs.full;
}
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
this(appid, numJobs, numTasks, numAttempts, false);
}
TestAppContext() {
this(0, 1, 2, 1);
}
@ -628,6 +634,46 @@ public void testJobCountersSlash() throws JSONException, Exception {
verifyHsJobCounters(info, jobsMap.get(id));
}
}
@Test
public void testJobCountersForKilledJob() throws Exception {
WebResource r = resource();
appContext = new TestAppContext(0, 1, 1, 1, true);
injector = Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
webApp = mock(HsWebApp.class);
when(webApp.name()).thenReturn("hsmockwebapp");
bind(JAXBContextResolver.class);
bind(HsWebServices.class);
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);
}
});
Map<JobId, Job> jobsMap = appContext.getAllJobs();
for (JobId id : jobsMap.keySet()) {
String jobId = MRApps.toString(id);
ClientResponse response = r.path("ws").path("v1").path("history")
.path("mapreduce").path("jobs").path(jobId).path("counters/")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobCounters");
WebServicesTestUtils.checkStringMatch("id", MRApps.toString(id),
info.getString("id"));
assertTrue("Job shouldn't contain any counters", info.length() == 1);
}
}
@Test
public void testJobCountersDefault() throws JSONException, Exception {

View File

@ -49,7 +49,7 @@ then
fi
# Allow alternate conf dir location.
YARN_CONF_DIR="${HADOOP_CONF_DIR:-$YARN_HOME/conf}"
export YARN_CONF_DIR="${HADOOP_CONF_DIR:-$YARN_HOME/conf}"
#check to see it is specified whether to use the slaves or the
# masters file

View File

@ -228,6 +228,11 @@
<artifactId>hadoop-distcp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-datajoin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-rumen</artifactId>

View File

@ -0,0 +1,120 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-datajoin</artifactId>
<version>3.0.0-SNAPSHOT</version>
<description>Apache Hadoop Data Join</description>
<name>Apache Hadoop Data Join</name>
<packaging>jar</packaging>
<properties>
<hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-log-dir</id>
<phase>process-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete dir="${test.build.data}"/>
<mkdir dir="${test.build.data}"/>
<mkdir dir="${hadoop.log.dir}"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -51,7 +51,7 @@ public abstract class JobBase implements Mapper, Reducer {
* the value for the counter
*/
protected void setLongValue(Object name, long value) {
this.longCounters.put(name, new Long(value));
this.longCounters.put(name, Long.valueOf(value));
}
/**
@ -100,9 +100,9 @@ protected Long addLongValue(Object name, long inc) {
Long val = this.longCounters.get(name);
Long retv = null;
if (val == null) {
retv = new Long(inc);
retv = Long.valueOf(inc);
} else {
retv = new Long(val.longValue() + inc);
retv = Long.valueOf(val.longValue() + inc);
}
this.longCounters.put(name, retv);
return retv;

View File

@ -57,6 +57,7 @@ protected void tearDown() throws Exception {
public void testDataJoin() throws Exception {
final int srcs = 4;
JobConf job = new JobConf();
job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
Path base = cluster.getFileSystem().makeQualified(new Path("/inner"));
Path[] src = writeSimpleSrc(base, job, srcs);
job.setInputFormat(SequenceFileInputFormat.class);

View File

@ -52,6 +52,11 @@
<artifactId>hadoop-rumen</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-datajoin</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-extras</artifactId>

View File

@ -32,6 +32,7 @@
<module>hadoop-distcp</module>
<module>hadoop-archives</module>
<module>hadoop-rumen</module>
<module>hadoop-datajoin</module>
<module>hadoop-tools-dist</module>
<module>hadoop-extras</module>
</modules>