Merge r1291972 through r1293033 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1293035 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-23 23:52:26 +00:00
commit 024c23c95f
37 changed files with 291 additions and 201 deletions

View File

@ -91,6 +91,10 @@ Release 0.23.2 - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-7660. Maven generated .classpath doesnot includes
"target/generated-test-source/java" as source directory.
(Laxman via bobby)
HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading
it back into HDFS, the put fails due to a CRC mismatch it back into HDFS, the put fails due to a CRC mismatch
(Daryn Sharp via bobby) (Daryn Sharp via bobby)

View File

@ -448,7 +448,7 @@
</execution> </execution>
<execution> <execution>
<id>add-test-source</id> <id>add-test-source</id>
<phase>generate-test-sources</phase> <phase>generate-sources</phase>
<goals> <goals>
<goal>add-test-source</goal> <goal>add-test-source</goal>
</goals> </goals>

View File

@ -147,6 +147,11 @@ Release 0.23.2 - UNRELEASED
HDFS-2725. hdfs script usage information is missing the information HDFS-2725. hdfs script usage information is missing the information
about "dfs" command (Prashant Sharma via stevel) about "dfs" command (Prashant Sharma via stevel)
HDFS-2907. Add a conf property dfs.datanode.fsdataset.factory to make
FSDataset in Datanode pluggable. (szetszwo)
HDFS-2985. Improve logging when replicas are marked as corrupt. (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -174,6 +179,14 @@ Release 0.23.2 - UNRELEASED
HDFS-2969. ExtendedBlock.equals is incorrectly implemented (todd) HDFS-2969. ExtendedBlock.equals is incorrectly implemented (todd)
HDFS-2944. Typo in hdfs-default.xml causes
dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
disabled. (atm)
HDFS-2981. In hdfs-default.xml, the default value of
dfs.client.block.write.replace-datanode-on-failure.enable should be true.
(szetszwo)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -167,7 +167,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname"; public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts"; public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude"; public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
@ -211,10 +210,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64; public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours"; public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
public static final String DFS_DATANODE_SIMULATEDDATASTORAGE_KEY = "dfs.datanode.simulateddatastorage";
public static final boolean DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT = false;
public static final String DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_KEY = "dfs.datanode.simulateddatastorage.capacity";
public static final long DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_DEFAULT = 2L<<40;
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy"; public static final String DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy";
@ -282,6 +277,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
//Keys with no defaults //Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup"; public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins"; public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";

View File

@ -88,7 +88,6 @@ public class HdfsConfiguration extends Configuration {
deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY); deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY);
deprecate("dfs.upgrade.permission", DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY); deprecate("dfs.upgrade.permission", DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY);
deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY); deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
deprecate("StorageId", DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY);
deprecate("dfs.https.client.keystore.resource", DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY); deprecate("dfs.https.client.keystore.resource", DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY);
deprecate("dfs.https.need.client.auth", DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY); deprecate("dfs.https.need.client.auth", DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY);
deprecate("slave.host.name", DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY); deprecate("slave.host.name", DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);

View File

@ -807,9 +807,11 @@ public class BlockManager {
* Mark the block belonging to datanode as corrupt * Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt * @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica * @param dn Datanode which holds the corrupt replica
* @param reason a textual reason why the block should be marked corrupt,
* for logging purposes
*/ */
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn) throws IOException { final DatanodeInfo dn, String reason) throws IOException {
namesystem.writeLock(); namesystem.writeLock();
try { try {
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
@ -822,14 +824,15 @@ public class BlockManager {
+ blk + " not found."); + blk + " not found.");
return; return;
} }
markBlockAsCorrupt(storedBlock, dn); markBlockAsCorrupt(storedBlock, dn, reason);
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
} }
private void markBlockAsCorrupt(BlockInfo storedBlock, private void markBlockAsCorrupt(BlockInfo storedBlock,
DatanodeInfo dn) throws IOException { DatanodeInfo dn,
String reason) throws IOException {
assert storedBlock != null : "storedBlock should not be null"; assert storedBlock != null : "storedBlock should not be null";
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) { if (node == null) {
@ -853,7 +856,7 @@ public class BlockManager {
node.addBlock(storedBlock); node.addBlock(storedBlock);
// Add this replica to corruptReplicas Map // Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(storedBlock, node); corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason);
if (countNodes(storedBlock).liveReplicas() >= inode.getReplication()) { if (countNodes(storedBlock).liveReplicas() >= inode.getReplication()) {
// the block is over-replicated so invalidate the replicas immediately // the block is over-replicated so invalidate the replicas immediately
invalidateBlock(storedBlock, node); invalidateBlock(storedBlock, node);
@ -1316,6 +1319,21 @@ public class BlockManager {
} }
} }
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
final BlockInfo blockInfo;
final String reason;
BlockToMarkCorrupt(BlockInfo blockInfo, String reason) {
super();
this.blockInfo = blockInfo;
this.reason = reason;
}
}
/** /**
* The given datanode is reporting all its blocks. * The given datanode is reporting all its blocks.
* Update the (machine-->blocklist) and (block-->machinelist) maps. * Update the (machine-->blocklist) and (block-->machinelist) maps.
@ -1369,7 +1387,7 @@ public class BlockManager {
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>(); Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>(); Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
@ -1389,8 +1407,8 @@ public class BlockManager {
+ " does not belong to any file."); + " does not belong to any file.");
addToInvalidates(b, node); addToInvalidates(b, node);
} }
for (BlockInfo b : toCorrupt) { for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, node); markBlockAsCorrupt(b.blockInfo, node, b.reason);
} }
} }
@ -1421,8 +1439,10 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block. // If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState(); BlockUCState ucState = storedBlock.getBlockUCState();
if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) { BlockToMarkCorrupt c = checkReplicaCorrupt(
markBlockAsCorrupt(storedBlock, node); iblk, reportedState, storedBlock, ucState, node);
if (c != null) {
markBlockAsCorrupt(c.blockInfo, node, c.reason);
continue; continue;
} }
@ -1444,7 +1464,7 @@ public class BlockManager {
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockInfo> toCorrupt, // add to corrupt replicas list Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list Collection<StatefulBlockInfo> toUC) { // add to under-construction list
// place a delimiter in the list which separates blocks // place a delimiter in the list which separates blocks
// that have been reported from those that have not // that have been reported from those that have not
@ -1507,7 +1527,7 @@ public class BlockManager {
final Block block, final ReplicaState reportedState, final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd, final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate, final Collection<Block> toInvalidate,
final Collection<BlockInfo> toCorrupt, final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) { final Collection<StatefulBlockInfo> toUC) {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -1538,8 +1558,10 @@ public class BlockManager {
return storedBlock; return storedBlock;
} }
if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) { BlockToMarkCorrupt c = checkReplicaCorrupt(
toCorrupt.add(storedBlock); block, reportedState, storedBlock, ucState, dn);
if (c != null) {
toCorrupt.add(c);
return storedBlock; return storedBlock;
} }
@ -1563,8 +1585,11 @@ public class BlockManager {
* as switch statements, on the theory that it is easier to understand * as switch statements, on the theory that it is easier to understand
* the combinatorics of reportedState and ucState that way. It should be * the combinatorics of reportedState and ucState that way. It should be
* at least as efficient as boolean expressions. * at least as efficient as boolean expressions.
*
* @return a BlockToMarkCorrupt object, or null if the replica is not corrupt
*/ */
private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState, private BlockToMarkCorrupt checkReplicaCorrupt(
Block iblk, ReplicaState reportedState,
BlockInfo storedBlock, BlockUCState ucState, BlockInfo storedBlock, BlockUCState ucState,
DatanodeDescriptor dn) { DatanodeDescriptor dn) {
switch(reportedState) { switch(reportedState) {
@ -1572,17 +1597,31 @@ public class BlockManager {
switch(ucState) { switch(ucState) {
case COMPLETE: case COMPLETE:
case COMMITTED: case COMMITTED:
return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp() if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
|| storedBlock.getNumBytes() != iblk.getNumBytes()); return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported genstamp " +
iblk.getGenerationStamp() + " does not match " +
"genstamp in block map " + storedBlock.getGenerationStamp());
} else if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported length " +
iblk.getNumBytes() + " does not match " +
"length in block map " + storedBlock.getNumBytes());
} else {
return null; // not corrupt
}
default: default:
return false; return null;
} }
case RBW: case RBW:
case RWR: case RWR:
if (!storedBlock.isComplete()) { if (!storedBlock.isComplete()) {
return false; return null; // not corrupt
} else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) { } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
return true; return new BlockToMarkCorrupt(storedBlock,
"reported " + reportedState + " replica with genstamp " +
iblk.getGenerationStamp() + " does not match COMPLETE block's " +
"genstamp in block map " + storedBlock.getGenerationStamp());
} else { // COMPLETE block, same genstamp } else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) { if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that // If it's a RBW report for a COMPLETE block, it may just be that
@ -1592,18 +1631,22 @@ public class BlockManager {
LOG.info("Received an RBW replica for block " + storedBlock + LOG.info("Received an RBW replica for block " + storedBlock +
" on " + dn.getName() + ": ignoring it, since the block is " + " on " + dn.getName() + ": ignoring it, since the block is " +
"complete with the same generation stamp."); "complete with the same generation stamp.");
return false; return null;
} else { } else {
return true; return new BlockToMarkCorrupt(storedBlock,
"reported replica has invalid state " + reportedState);
} }
} }
case RUR: // should not be reported case RUR: // should not be reported
case TEMPORARY: // should not be reported case TEMPORARY: // should not be reported
default: default:
LOG.warn("Unexpected replica state " + reportedState String msg = "Unexpected replica state " + reportedState
+ " for block: " + storedBlock + + " for block: " + storedBlock +
" on " + dn.getName() + " size " + storedBlock.getNumBytes()); " on " + dn.getName() + " size " + storedBlock.getNumBytes();
return true; // log here at WARN level since this is really a broken HDFS
// invariant
LOG.warn(msg);
return new BlockToMarkCorrupt(storedBlock, msg);
} }
} }
@ -2134,7 +2177,7 @@ public class BlockManager {
// blockReceived reports a finalized block // blockReceived reports a finalized block
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>(); Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
processReportedBlock(node, block, ReplicaState.FINALIZED, processReportedBlock(node, block, ReplicaState.FINALIZED,
toAdd, toInvalidate, toCorrupt, toUC); toAdd, toInvalidate, toCorrupt, toUC);
@ -2155,8 +2198,8 @@ public class BlockManager {
+ " does not belong to any file."); + " does not belong to any file.");
addToInvalidates(b, node); addToInvalidates(b, node);
} }
for (BlockInfo b : toCorrupt) { for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, node); markBlockAsCorrupt(b.blockInfo, node, b.reason);
} }
} }

View File

@ -44,25 +44,37 @@ public class CorruptReplicasMap{
* *
* @param blk Block to be added to CorruptReplicasMap * @param blk Block to be added to CorruptReplicasMap
* @param dn DatanodeDescriptor which holds the corrupt replica * @param dn DatanodeDescriptor which holds the corrupt replica
* @param reason a textual reason (for logging purposes)
*/ */
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn) { public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason) {
Collection<DatanodeDescriptor> nodes = getNodes(blk); Collection<DatanodeDescriptor> nodes = getNodes(blk);
if (nodes == null) { if (nodes == null) {
nodes = new TreeSet<DatanodeDescriptor>(); nodes = new TreeSet<DatanodeDescriptor>();
corruptReplicasMap.put(blk, nodes); corruptReplicasMap.put(blk, nodes);
} }
String reasonText;
if (reason != null) {
reasonText = " because " + reason;
} else {
reasonText = "";
}
if (!nodes.contains(dn)) { if (!nodes.contains(dn)) {
nodes.add(dn); nodes.add(dn);
NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+ NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
blk.getBlockName() + blk.getBlockName() +
" added as corrupt on " + dn.getName() + " added as corrupt on " + dn.getName() +
" by " + Server.getRemoteIp()); " by " + Server.getRemoteIp() +
reasonText);
} else { } else {
NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+ NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
"duplicate requested for " + "duplicate requested for " +
blk.getBlockName() + " to add as corrupt " + blk.getBlockName() + " to add as corrupt " +
"on " + dn.getName() + "on " + dn.getName() +
" by " + Server.getRemoteIp()); " by " + Server.getRemoteIp() +
reasonText);
} }
} }

View File

@ -43,10 +43,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
@ -162,13 +159,11 @@ import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
@ -438,13 +433,14 @@ public class DataNode extends Configured
} }
} }
private synchronized void setClusterId(String cid) throws IOException { private synchronized void setClusterId(final String nsCid, final String bpid
if(clusterId != null && !clusterId.equals(cid)) { ) throws IOException {
throw new IOException ("cluster id doesn't match. old cid=" + clusterId if(clusterId != null && !clusterId.equals(nsCid)) {
+ " new cid="+ cid); throw new IOException ("Cluster IDs not matched: dn cid=" + clusterId
+ " but ns cid="+ nsCid + "; bpid=" + bpid);
} }
// else // else
clusterId = cid; clusterId = nsCid;
} }
private static String getHostName(Configuration config) private static String getHostName(Configuration config)
@ -847,51 +843,22 @@ public class DataNode extends Configured
*/ */
void initBlockPool(BPOfferService bpos) throws IOException { void initBlockPool(BPOfferService bpos) throws IOException {
NamespaceInfo nsInfo = bpos.getNamespaceInfo(); NamespaceInfo nsInfo = bpos.getNamespaceInfo();
Preconditions.checkState(nsInfo != null, if (nsInfo == null) {
"Block pool " + bpos + " should have retrieved " + throw new IOException("NamespaceInfo not found: Block pool " + bpos
"its namespace info before calling initBlockPool."); + " should have retrieved namespace info before initBlockPool.");
}
String blockPoolId = nsInfo.getBlockPoolID();
// Register the new block pool with the BP manager. // Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos); blockPoolManager.addBlockPool(bpos);
synchronized (this) { setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// we do not allow namenode from different cluster to register
if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
throw new IOException(
"cannot register with the namenode because clusterid do not match:"
+ " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID +
";dn cid=" + clusterId);
}
setClusterId(nsInfo.clusterID);
}
StartupOption startOpt = getStartupOption(conf);
assert startOpt != null : "Startup option must be set.";
boolean simulatedFSDataset = conf.getBoolean(
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (!simulatedFSDataset) {
// read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
dataDirs, startOpt);
StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
LOG.info("setting up storage: nsid=" +
bpStorage.getNamespaceID() + ";bpid="
+ blockPoolId + ";lv=" + storage.getLayoutVersion() +
";nsInfo=" + nsInfo);
}
// In the case that this is the first block pool to connect, initialize // In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc. // the dataset, block scanners, etc.
initFsDataSet(); initStorage(nsInfo);
initPeriodicScanners(conf); initPeriodicScanners(conf);
data.addBlockPool(blockPoolId, conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf);
} }
/** /**
@ -918,31 +885,28 @@ public class DataNode extends Configured
* Initializes the {@link #data}. The initialization is done only once, when * Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed. * handshake with the the first namenode is completed.
*/ */
private synchronized void initFsDataSet() throws IOException { private void initStorage(final NamespaceInfo nsInfo) throws IOException {
if (data != null) { // Already initialized final FSDatasetInterface.Factory factory
return; = FSDatasetInterface.Factory.getFactory(conf);
if (!factory.isSimulated()) {
final StartupOption startOpt = getStartupOption(conf);
if (startOpt == null) {
throw new IOException("Startup option not set.");
}
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+ ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
+ ";nsInfo=" + nsInfo);
} }
// get version and id info from the name-node synchronized(this) {
boolean simulatedFSDataset = conf.getBoolean( if (data == null) {
DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, data = factory.createFSDatasetInterface(this, storage, conf);
DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
if (simulatedFSDataset) {
storage.createStorageID(getPort());
// it would have been better to pass storage as a parameter to
// constructor below - need to augment ReflectionUtils used below.
conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
try {
data = (FSDatasetInterface) ReflectionUtils.newInstance(
Class.forName(
"org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
conf);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.stringifyException(e));
} }
} else {
data = new FSDataset(this, storage, conf);
} }
} }

View File

@ -75,6 +75,16 @@ import org.apache.hadoop.util.ReflectionUtils;
***************************************************/ ***************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
class FSDataset implements FSDatasetInterface { class FSDataset implements FSDatasetInterface {
/**
* A factory for creating FSDataset objects.
*/
static class Factory extends FSDatasetInterface.Factory {
@Override
public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new FSDataset(datanode, storage, conf);
}
}
/** /**
* A node type that can be built into a tree reflecting the * A node type that can be built into a tree reflecting the
@ -1056,8 +1066,8 @@ class FSDataset implements FSDatasetInterface {
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
*/ */
FSDataset(DataNode datanode, DataStorage storage, Configuration conf) private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
throws IOException { ) throws IOException {
this.datanode = datanode; this.datanode = datanode;
this.maxBlocksPerDir = this.maxBlocksPerDir =
conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,

View File

@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/** /**
@ -49,6 +51,30 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface FSDatasetInterface extends FSDatasetMBean { public interface FSDatasetInterface extends FSDatasetMBean {
/**
* A factory for creating FSDatasetInterface objects.
*/
public abstract class Factory {
/** @return the configured factory. */
public static Factory getFactory(Configuration conf) {
final Class<? extends Factory> clazz = conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
FSDataset.Factory.class,
Factory.class);
return ReflectionUtils.newInstance(clazz, conf);
}
/** Create a FSDatasetInterface object. */
public abstract FSDatasetInterface createFSDatasetInterface(
DataNode datanode, DataStorage storage, Configuration conf
) throws IOException;
/** Does the factory create simulated objects? */
public boolean isSimulated() {
return false;
}
}
/** /**
* This is an interface for the underlying volume. * This is an interface for the underlying volume.
* @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume * @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume

View File

@ -550,7 +550,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
DatanodeInfo[] nodes = blocks[i].getLocations(); DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) { for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j]; DatanodeInfo dn = nodes[j];
namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn); namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn,
"client machine reported it");
} }
} }
} }

View File

@ -347,7 +347,7 @@ creations/deletions), or "all".</description>
<property> <property>
<name>dfs.client.block.write.replace-datanode-on-failure.enable</name> <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
<value>ture</value> <value>true</value>
<description> <description>
If there is a datanode/network failure in the write pipeline, If there is a datanode/network failure in the write pipeline,
DFSClient will try to remove the failed datanode from the pipeline DFSClient will try to remove the failed datanode from the pipeline
@ -355,7 +355,7 @@ creations/deletions), or "all".</description>
the number of datanodes in the pipeline is decreased. The feature is the number of datanodes in the pipeline is decreased. The feature is
to add new datanodes to the pipeline. to add new datanodes to the pipeline.
This is a site-wise property to enable/disable the feature. This is a site-wide property to enable/disable the feature.
See also dfs.client.block.write.replace-datanode-on-failure.policy See also dfs.client.block.write.replace-datanode-on-failure.policy
</description> </description>

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog; import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
@ -122,10 +123,9 @@ public class DataNodeCluster {
} }
dataNodeDirs = args[i]; dataNodeDirs = args[i];
} else if (args[i].equals("-simulated")) { } else if (args[i].equals("-simulated")) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} else if (args[i].equals("-inject")) { } else if (args[i].equals("-inject")) {
if (!conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, if (!FSDatasetInterface.Factory.getFactory(conf).isSimulated()) {
false) ) {
System.out.print("-inject is valid only for simulated"); System.out.print("-inject is valid only for simulated");
printUsageExit(); printUsageExit();
} }
@ -158,7 +158,7 @@ public class DataNodeCluster {
System.exit(-1); System.exit(-1);
} }
boolean simulated = boolean simulated =
conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, false); FSDatasetInterface.Factory.getFactory(conf).isSimulated();
System.out.println("Starting " + numDataNodes + System.out.println("Starting " + numDataNodes +
(simulated ? " Simulated " : " ") + (simulated ? " Simulated " : " ") +
" Data Nodes that will connect to Name Node at " + nameNodeAdr); " Data Nodes that will connect to Name Node at " + nameNodeAdr);

View File

@ -842,7 +842,7 @@ public class MiniDFSCluster {
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs); conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
} }
if (simulatedCapacities != null) { if (simulatedCapacities != null) {
dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(dnConf);
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]); simulatedCapacities[i-curDatanodesNum]);
} }

View File

@ -107,7 +107,7 @@ public class TestFileAppend{
public void testCopyOnWrite() throws IOException { public void testCopyOnWrite() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -178,7 +178,7 @@ public class TestFileAppend{
public void testSimpleFlush() throws IOException { public void testSimpleFlush() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -234,7 +234,7 @@ public class TestFileAppend{
public void testComplexFlush() throws IOException { public void testComplexFlush() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -283,7 +283,7 @@ public class TestFileAppend{
public void testFileNotFound() throws IOException { public void testFileNotFound() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -82,7 +82,7 @@ public class TestFileAppend2 extends TestCase {
public void testSimpleAppend() throws IOException { public void testSimpleAppend() throws IOException {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50); conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
conf.setBoolean("dfs.support.append", true); conf.setBoolean("dfs.support.append", true);

View File

@ -77,7 +77,7 @@ public class TestFileAppend4 {
public void setUp() throws Exception { public void setUp() throws Exception {
this.conf = new Configuration(); this.conf = new Configuration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);

View File

@ -147,7 +147,7 @@ public class TestFileCorruption extends TestCase {
DatanodeRegistration dnR = DatanodeRegistration dnR =
DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId()); DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId());
cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt( cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
blk, new DatanodeInfo(dnR)); blk, new DatanodeInfo(dnR), "TEST");
// open the file // open the file
fs.open(FILE_PATH); fs.open(FILE_PATH);

View File

@ -144,7 +144,7 @@ public class TestFileCreation extends junit.framework.TestCase {
public void testFileCreation() throws IOException { public void testFileCreation() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -223,7 +223,7 @@ public class TestFileCreation extends junit.framework.TestCase {
public void testDeleteOnExit() throws IOException { public void testDeleteOnExit() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -287,7 +287,7 @@ public class TestFileCreation extends junit.framework.TestCase {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
// create cluster // create cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -361,7 +361,7 @@ public class TestFileCreation extends junit.framework.TestCase {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
// create cluster // create cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@ -460,7 +460,7 @@ public class TestFileCreation extends junit.framework.TestCase {
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
// create cluster // create cluster
@ -599,7 +599,7 @@ public class TestFileCreation extends junit.framework.TestCase {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
System.out.println("Testing adbornal client death."); System.out.println("Testing adbornal client death.");
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
@ -634,7 +634,7 @@ public class TestFileCreation extends junit.framework.TestCase {
public void testFileCreationNonRecursive() throws IOException { public void testFileCreationNonRecursive() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -136,7 +136,7 @@ public class TestInjectionForSimulatedStorage extends TestCase {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes)); conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes));
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize);
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
//first time format //first time format
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive(); cluster.waitActive();
@ -159,7 +159,7 @@ public class TestInjectionForSimulatedStorage extends TestCase {
LOG.info("Restarting minicluster"); LOG.info("Restarting minicluster");
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)

View File

@ -174,7 +174,7 @@ public class TestLargeBlock {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -206,7 +206,7 @@ public class TestPread extends TestCase {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096); conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean("dfs.datanode.simulateddatastorage", true); SimulatedFSDataset.setFactory(conf);
} }
if (disableTransferTo) { if (disableTransferTo) {
conf.setBoolean("dfs.datanode.transferTo.allowed", false); conf.setBoolean("dfs.datanode.transferTo.allowed", false);

View File

@ -199,7 +199,7 @@ public class TestReplication extends TestCase {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
if (simulated) { if (simulated) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)

View File

@ -28,7 +28,7 @@ public class TestSetrepIncreasing extends TestCase {
static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException { static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "" + fromREP); conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "" + fromREP);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);

View File

@ -124,7 +124,7 @@ public class TestShortCircuitLocalRead {
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName()); UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build(); .format(true).build();
@ -248,7 +248,7 @@ public class TestShortCircuitLocalRead {
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName()); UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build(); .format(true).build();

View File

@ -92,7 +92,7 @@ public class TestSmallBlock extends TestCase {
public void testSmallBlock() throws IOException { public void testSmallBlock() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean("dfs.datanode.simulateddatastorage", true); SimulatedFSDataset.setFactory(conf);
} }
conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1"); conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

View File

@ -77,7 +77,7 @@ public class TestBalancer extends TestCase {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
} }

View File

@ -83,14 +83,14 @@ public class TestCorruptReplicaInfo extends TestCase {
DatanodeDescriptor dn1 = new DatanodeDescriptor(); DatanodeDescriptor dn1 = new DatanodeDescriptor();
DatanodeDescriptor dn2 = new DatanodeDescriptor(); DatanodeDescriptor dn2 = new DatanodeDescriptor();
crm.addToCorruptReplicasMap(getBlock(0), dn1); crm.addToCorruptReplicasMap(getBlock(0), dn1, "TEST");
assertEquals("Number of corrupt blocks not returning correctly", assertEquals("Number of corrupt blocks not returning correctly",
1, crm.size()); 1, crm.size());
crm.addToCorruptReplicasMap(getBlock(1), dn1); crm.addToCorruptReplicasMap(getBlock(1), dn1, "TEST");
assertEquals("Number of corrupt blocks not returning correctly", assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size()); 2, crm.size());
crm.addToCorruptReplicasMap(getBlock(1), dn2); crm.addToCorruptReplicasMap(getBlock(1), dn2, "TEST");
assertEquals("Number of corrupt blocks not returning correctly", assertEquals("Number of corrupt blocks not returning correctly",
2, crm.size()); 2, crm.size());
@ -103,7 +103,7 @@ public class TestCorruptReplicaInfo extends TestCase {
0, crm.size()); 0, crm.size());
for (Long block_id: block_ids) { for (Long block_id: block_ids) {
crm.addToCorruptReplicasMap(getBlock(block_id), dn1); crm.addToCorruptReplicasMap(getBlock(block_id), dn1, "TEST");
} }
assertEquals("Number of corrupt blocks not returning correctly", assertEquals("Number of corrupt blocks not returning correctly",

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -32,7 +31,6 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.StandardMBean; import javax.management.StandardMBean;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -63,21 +61,33 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
* *
* Note the synchronization is coarse grained - it is at each method. * Note the synchronization is coarse grained - it is at each method.
*/ */
public class SimulatedFSDataset implements FSDatasetInterface {
static class Factory extends FSDatasetInterface.Factory {
@Override
public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new SimulatedFSDataset(datanode, storage, conf);
}
public class SimulatedFSDataset implements FSDatasetInterface, Configurable{ @Override
public boolean isSimulated() {
return true;
}
}
public static void setFactory(Configuration conf) {
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
Factory.class.getName());
}
public static final String CONFIG_PROPERTY_SIMULATED =
"dfs.datanode.simulateddatastorage";
public static final String CONFIG_PROPERTY_CAPACITY = public static final String CONFIG_PROPERTY_CAPACITY =
"dfs.datanode.simulateddatastorage.capacity"; "dfs.datanode.simulateddatastorage.capacity";
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte public static final byte DEFAULT_DATABYTE = 9;
byte simulatedDataByte = DEFAULT_DATABYTE;
Configuration conf = null;
static byte[] nullCrcFileData; static final byte[] nullCrcFileData;
{ static {
DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum. DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
CHECKSUM_NULL, 16*1024 ); CHECKSUM_NULL, 16*1024 );
byte[] nullCrcHeader = checksum.getHeader(); byte[] nullCrcHeader = checksum.getHeader();
@ -360,31 +370,22 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
} }
} }
private Map<String, Map<Block, BInfo>> blockMap = null; private final Map<String, Map<Block, BInfo>> blockMap
private SimulatedStorage storage = null; = new HashMap<String, Map<Block,BInfo>>();
private String storageId; private final SimulatedStorage storage;
private final String storageId;
public SimulatedFSDataset(Configuration conf) throws IOException { public SimulatedFSDataset(DataNode datanode, DataStorage storage,
setConf(conf); Configuration conf) {
if (storage != null) {
storage.createStorageID(datanode.getPort());
this.storageId = storage.getStorageID();
} else {
this.storageId = "unknownStorageId" + new Random().nextInt();
} }
// Constructor used for constructing the object using reflection
@SuppressWarnings("unused")
private SimulatedFSDataset() { // real construction when setConf called..
}
public Configuration getConf() {
return conf;
}
public void setConf(Configuration iconf) {
conf = iconf;
storageId = conf.get(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, "unknownStorageId" +
new Random().nextInt());
registerMBean(storageId); registerMBean(storageId);
storage = new SimulatedStorage( this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
blockMap = new HashMap<String, Map<Block,BInfo>>();
} }
public synchronized void injectBlocks(String bpid, public synchronized void injectBlocks(String bpid,
@ -441,23 +442,16 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
@Override @Override
public synchronized BlockListAsLongs getBlockReport(String bpid) { public synchronized BlockListAsLongs getBlockReport(String bpid) {
final List<Block> blocks = new ArrayList<Block>();
final Map<Block, BInfo> map = blockMap.get(bpid); final Map<Block, BInfo> map = blockMap.get(bpid);
Block[] blockTable = new Block[map.size()];
if (map != null) { if (map != null) {
int count = 0;
for (BInfo b : map.values()) { for (BInfo b : map.values()) {
if (b.isFinalized()) { if (b.isFinalized()) {
blockTable[count++] = b.theBlock; blocks.add(b.theBlock);
} }
} }
if (count != blockTable.length) {
blockTable = Arrays.copyOf(blockTable, count);
} }
} else { return new BlockListAsLongs(blocks, null);
blockTable = new Block[0];
}
return new BlockListAsLongs(
new ArrayList<Block>(Arrays.asList(blockTable)), null);
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean

View File

@ -34,7 +34,7 @@ public class TestDataNodeMetrics extends TestCase {
public void testDataNodeMetrics() throws Exception { public void testDataNodeMetrics() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -45,7 +45,7 @@ public class TestSimulatedFSDataset extends TestCase {
protected void setUp() throws Exception { protected void setUp() throws Exception {
super.setUp(); super.setUp();
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
@ -87,6 +87,18 @@ public class TestSimulatedFSDataset extends TestCase {
return addSomeBlocks(fsdataset, 1); return addSomeBlocks(fsdataset, 1);
} }
public void testFSDatasetFactory() {
final Configuration conf = new Configuration();
FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf);
assertEquals(FSDataset.Factory.class, f.getClass());
assertFalse(f.isSimulated());
SimulatedFSDataset.setFactory(conf);
FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf);
assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
assertTrue(s.isSimulated());
}
public void testGetMetaData() throws IOException { public void testGetMetaData() throws IOException {
FSDatasetInterface fsdataset = getSimulatedFSDataset(); FSDatasetInterface fsdataset = getSimulatedFSDataset();
ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0);
@ -287,8 +299,8 @@ public class TestSimulatedFSDataset extends TestCase {
} }
} }
private SimulatedFSDataset getSimulatedFSDataset() throws IOException { private SimulatedFSDataset getSimulatedFSDataset() {
SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, null, conf);
fsdataset.addBlockPool(bpid, conf); fsdataset.addBlockPool(bpid, conf);
return fsdataset; return fsdataset;
} }

View File

@ -83,7 +83,7 @@ public class TestFileLimit extends TestCase {
int currentNodes = 0; int currentNodes = 0;
if (simulatedStorage) { if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();

View File

@ -174,7 +174,8 @@ public class TestNameNodeMetrics {
// Corrupt first replica of the block // Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations( LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0); cluster.getNameNode(), file.toString(), 0, 1).get(0);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]); bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
updateMetrics(); updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS); MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 1L, rb); assertGauge("CorruptBlocks", 1L, rb);
@ -213,7 +214,8 @@ public class TestNameNodeMetrics {
// Corrupt the only replica of the block to result in a missing block // Corrupt the only replica of the block to result in a missing block
LocatedBlock block = NameNodeAdapter.getBlockLocations( LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0); cluster.getNameNode(), file.toString(), 0, 1).get(0);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]); bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"TEST");
updateMetrics(); updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS); MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("UnderReplicatedBlocks", 1L, rb); assertGauge("UnderReplicatedBlocks", 1L, rb);

View File

@ -70,6 +70,11 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it
to the maven build. (Ravi Prakash via vinodkv) to the maven build. (Ravi Prakash via vinodkv)
MAPREDUCE-3884. PWD should be first in the classpath of MR tasks (tucu)
MAPREDUCE-3878. Null user on filtered jobhistory job page (Jonathon Eagles
via tgraves)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
NEW FEATURES NEW FEATURES

View File

@ -343,9 +343,15 @@ public class AppController extends Controller implements AMParams {
* @return True if the requesting user has permission to view the job * @return True if the requesting user has permission to view the job
*/ */
boolean checkAccess(Job job) { boolean checkAccess(Job job) {
UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser( String remoteUser = request().getRemoteUser();
request().getRemoteUser()); UserGroupInformation callerUGI = null;
return job.checkAccess(callerUgi, JobACL.VIEW_JOB); if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
return false;
}
return true;
} }
/** /**

View File

@ -230,6 +230,9 @@ public class MRApps extends Apps {
boolean userClassesTakesPrecedence = boolean userClassesTakesPrecedence =
conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
Apps.addToEnvironment(environment,
Environment.CLASSPATH.name(),
Environment.PWD.$());
if (!userClassesTakesPrecedence) { if (!userClassesTakesPrecedence) {
MRApps.setMRFrameworkClasspath(environment, conf); MRApps.setMRFrameworkClasspath(environment, conf);
} }

View File

@ -130,7 +130,7 @@ public class TestMRApps {
Job job = Job.getInstance(); Job job = Job.getInstance();
Map<String, String> environment = new HashMap<String, String>(); Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration()); MRApps.setClasspath(environment, job.getConfiguration());
assertEquals("$HADOOP_CONF_DIR:" + assertEquals("$PWD:$HADOOP_CONF_DIR:" +
"$HADOOP_COMMON_HOME/share/hadoop/common/*:" + "$HADOOP_COMMON_HOME/share/hadoop/common/*:" +
"$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" + "$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" +
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" + "$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" +
@ -152,7 +152,7 @@ public class TestMRApps {
} }
String env_str = env.get("CLASSPATH"); String env_str = env.get("CLASSPATH");
assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!", assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
env_str.indexOf("job.jar"), 0); env_str.indexOf("$PWD:job.jar"), 0);
} }
@Test public void testSetClasspathWithNoUserPrecendence() { @Test public void testSetClasspathWithNoUserPrecendence() {
@ -166,7 +166,7 @@ public class TestMRApps {
} }
String env_str = env.get("CLASSPATH"); String env_str = env.get("CLASSPATH");
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!", assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
env_str.indexOf("job.jar"), 0); env_str.indexOf("$PWD:job.jar"), 0);
} }
} }