Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1227260 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-01-04 18:47:51 +00:00
commit cd81cc6664
27 changed files with 369 additions and 84 deletions

View File

@ -202,6 +202,9 @@ Release 0.23.1 - Unreleased
HADOOP-7504. Add the missing Ganglia31 opts to hadoop-metrics.properties as a comment. (harsh)
HADOOP-7933. Add a getDelegationTokens api to FileSystem which checks
for known tokens in the passed Credentials object. (sseth)
OPTIMIZATIONS
BUG FIXES
@ -235,6 +238,9 @@ Release 0.23.1 - Unreleased
HADOOP-7837. no NullAppender in the log4j config. (eli)
HADOOP-7948. Shell scripts created by hadoop-dist/pom.xml to build tar do not
properly propagate failure. (cim_michajlomatijkiw via tucu)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -628,8 +628,11 @@
<tr>
<td>conf/hdfs-site.xml</td>
<td>dfs.blocksize</td>
<td>134217728</td>
<td>HDFS blocksize of 128MB for large file-systems.</td>
<td>128m</td>
<td>
HDFS blocksize of 128 MB for large file-systems. Sizes can be provided
in size-prefixed values (10k, 128m, 1g, etc.) or simply in bytes (134217728 for 128 MB, etc.).
</td>
</tr>
<tr>
<td>conf/hdfs-site.xml</td>

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -393,6 +394,40 @@ public abstract class FileSystem extends Configured implements Closeable {
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
return new ArrayList<Token<?>>(0);
}
/**
* @see #getDelegationTokens(String)
* This is similar to getDelegationTokens, with the added restriction that if
* a token is already present in the passed Credentials object - that token
* is returned instead of a new delegation token.
*
* If the token is found to be cached in the Credentials object, this API does
* not verify the token validity or the passed in renewer.
*
*
* @param renewer the account name that is allowed to renew the token.
* @param credentials a Credentials object containing already knowing
* delegationTokens.
* @return a list of delegation tokens.
* @throws IOException
*/
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
public List<Token<?>> getDelegationTokens(String renewer,
Credentials credentials) throws IOException {
List<Token<?>> allTokens = getDelegationTokens(renewer);
List<Token<?>> newTokens = new ArrayList<Token<?>>();
if (allTokens != null) {
for (Token<?> token : allTokens) {
Token<?> knownToken = credentials.getToken(token.getService());
if (knownToken == null) {
newTokens.add(token);
} else {
newTokens.add(knownToken);
}
}
}
return newTokens;
}
/** create a file with the provided permission
* The permission of the file is set to be the provided permission as in

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
@ -388,4 +389,11 @@ public class FilterFileSystem extends FileSystem {
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
return fs.getDelegationTokens(renewer);
}
}
@Override
// FileSystem
public List<Token<?>> getDelegationTokens(String renewer,
Credentials credentials) throws IOException {
return fs.getDelegationTokens(renewer, credentials);
}
}

View File

@ -24,7 +24,9 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.Map.Entry;
@ -45,7 +47,9 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.InodeTree.INode;
import org.apache.hadoop.fs.viewfs.InodeTree.INodeLink;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
@ -495,7 +499,40 @@ public class ViewFileSystem extends FileSystem {
}
return result;
}
@Override
public List<Token<?>> getDelegationTokens(String renewer,
Credentials credentials) throws IOException {
List<InodeTree.MountPoint<FileSystem>> mountPoints =
fsState.getMountPoints();
int initialListSize = 0;
for (InodeTree.MountPoint<FileSystem> im : mountPoints) {
initialListSize += im.target.targetDirLinkList.length;
}
Set<String> seenServiceNames = new HashSet<String>();
List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
for (int i = 0; i < mountPoints.size(); ++i) {
String serviceName =
mountPoints.get(i).target.targetFileSystem.getCanonicalServiceName();
if (seenServiceNames.contains(serviceName)) {
continue;
}
seenServiceNames.add(serviceName);
Token<?> knownToken = credentials.getToken(new Text(serviceName));
if (knownToken != null) {
result.add(knownToken);
} else {
List<Token<?>> tokens =
mountPoints.get(i).target.targetFileSystem
.getDelegationTokens(renewer);
if (tokens != null) {
result.addAll(tokens);
}
}
}
return result;
}
/*
* An instance of this class represents an internal dir of the viewFs
* that is internal dir of the mount table.

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.junit.After;
import org.junit.Assert;
@ -89,6 +90,16 @@ public class ViewFileSystemBaseTest {
// Set up the defaultMT in the config with our mount point links
//Configuration conf = new Configuration();
conf = ViewFileSystemTestSetup.configWithViewfsScheme();
setupMountPoints();
fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
}
@After
public void tearDown() throws Exception {
fsTarget.delete(FileSystemTestHelper.getTestRootPath(fsTarget), true);
}
void setupMountPoints() {
ConfigUtil.addLink(conf, "/user", new Path(targetTestRoot,"user").toUri());
ConfigUtil.addLink(conf, "/user2", new Path(targetTestRoot,"user").toUri());
ConfigUtil.addLink(conf, "/data", new Path(targetTestRoot,"data").toUri());
@ -100,20 +111,17 @@ public class ViewFileSystemBaseTest {
new Path(targetTestRoot,"missingTarget").toUri());
ConfigUtil.addLink(conf, "/linkToAFile",
new Path(targetTestRoot,"aFile").toUri());
fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
}
@After
public void tearDown() throws Exception {
fsTarget.delete(FileSystemTestHelper.getTestRootPath(fsTarget), true);
}
@Test
public void testGetMountPoints() {
ViewFileSystem viewfs = (ViewFileSystem) fsView;
MountPoint[] mountPoints = viewfs.getMountPoints();
Assert.assertEquals(7, mountPoints.length);
Assert.assertEquals(getExpectedMountPoints(), mountPoints.length);
}
int getExpectedMountPoints() {
return 7;
}
/**
@ -125,9 +133,46 @@ public class ViewFileSystemBaseTest {
public void testGetDelegationTokens() throws IOException {
List<Token<?>> delTokens =
fsView.getDelegationTokens("sanjay");
Assert.assertEquals(0, delTokens.size());
Assert.assertEquals(getExpectedDelegationTokenCount(), delTokens.size());
}
int getExpectedDelegationTokenCount() {
return 0;
}
@Test
public void testGetDelegationTokensWithCredentials() throws IOException {
Credentials credentials = new Credentials();
List<Token<?>> delTokens =
fsView.getDelegationTokens("sanjay", credentials);
int expectedTokenCount = getExpectedDelegationTokenCountWithCredentials();
Assert.assertEquals(expectedTokenCount, delTokens.size());
for (int i = 0; i < expectedTokenCount / 2; i++) {
Token<?> token = delTokens.get(i);
credentials.addToken(token.getService(), token);
}
List<Token<?>> delTokens2 =
fsView.getDelegationTokens("sanjay", credentials);
Assert.assertEquals(expectedTokenCount, delTokens2.size());
for (int i = 0; i < delTokens2.size(); i++) {
for (int j = 0; j < delTokens.size(); j++) {
if (delTokens.get(j) == delTokens2.get(i)) {
delTokens.remove(j);
break;
}
}
}
Assert.assertEquals(expectedTokenCount / 2, delTokens.size());
}
int getExpectedDelegationTokenCountWithCredentials() {
return 0;
}
@Test
public void testBasicPaths() {
Assert.assertEquals(FsConstants.VIEWFS_URI,
@ -340,7 +385,7 @@ public class ViewFileSystemBaseTest {
FileStatus[] dirPaths = fsView.listStatus(new Path("/"));
FileStatus fs;
Assert.assertEquals(6, dirPaths.length);
Assert.assertEquals(getExpectedDirPaths(), dirPaths.length);
fs = FileSystemTestHelper.containsPath(fsView, "/user", dirPaths);
Assert.assertNotNull(fs);
Assert.assertTrue("A mount should appear as symlink", fs.isSymlink());
@ -372,6 +417,10 @@ public class ViewFileSystemBaseTest {
Assert.assertTrue("A mount should appear as symlink", fs.isSymlink());
}
int getExpectedDirPaths() {
return 6;
}
@Test
public void testListOnMountTargetDirs() throws IOException {
FileStatus[] dirPaths = fsView.listStatus(new Path("/data"));

View File

@ -98,11 +98,12 @@
run() {
echo "\$ ${@}"
"${@}"
if [ $? != 0 ]; then
res=$?
if [ $res != 0 ]; then
echo
echo "Failed!"
echo
exit $?
exit $res
fi
}
@ -139,11 +140,12 @@
run() {
echo "\$ ${@}"
"${@}"
if [ $? != 0 ]; then
res=$?
if [ $res != 0 ]; then
echo
echo "Failed!"
echo
exit $?
exit $res
fi
}

View File

@ -121,6 +121,8 @@ Trunk (unreleased changes)
HDFS-2729. Update BlockManager's comments regarding the invalid block set (harsh)
HDFS-1314. Make dfs.blocksize accept size-indicating prefixes (Sho Shimauchi via harsh)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)

View File

@ -197,7 +197,7 @@ public class DFSClient implements java.io.Closeable {
/** dfs.write.packet.size is an internal config variable */
writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
defaultBlockSize = conf.getLong(DFS_BLOCK_SIZE_KEY,
defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
DFS_BLOCK_SIZE_DEFAULT);
defaultReplication = (short) conf.getInt(
DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT);

View File

@ -119,7 +119,7 @@ class DataXceiverServer implements Runnable {
conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
this.estimateBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
//set up parameter for cluster balancing

View File

@ -714,7 +714,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
this.serverDefaults = new FsServerDefaults(
conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),

View File

@ -529,7 +529,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public long getDefaultBlockSize() {
return getConf().getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
return getConf().getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
}

View File

@ -55,6 +55,6 @@ public class BlockSizeParam extends LongParam {
/** @return the value or, if it is null, return the default from conf. */
public long getValue(final Configuration conf) {
return getValue() != null? getValue()
: conf.getLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
: conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
}
}

View File

@ -341,7 +341,12 @@ creations/deletions), or "all".</description>
<property>
<name>dfs.blocksize</name>
<value>67108864</value>
<description>The default block size for new files.</description>
<description>
The default block size for new files, in bytes.
You can use the following suffix (case insensitive):
k(kilo), m(mega), g(giga), t(tera), p(peta), e(exa) to specify the size (such as 128k, 512m, 1g, etc.),
Or provide complete size in bytes (such as 134217728 for 128 MB).
</description>
</property>
<property>

View File

@ -51,7 +51,7 @@ public class TestParam {
final BlockSizeParam p = new BlockSizeParam(BlockSizeParam.DEFAULT);
Assert.assertEquals(null, p.getValue());
Assert.assertEquals(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT),
p.getValue(conf));

View File

@ -52,6 +52,9 @@ Trunk (unreleased changes)
MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh)
BUG FIXES
MAPREDUCE-3462. Fix Gridmix JUnit testcase failures.
(Ravi Prakash and Ravi Gummadi via amarrk)
MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks.
(Devaraj K and Amar Kamat via amarrk)
@ -175,6 +178,8 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3547. Added a bunch of unit tests for the the RM/NM webservices.
(Thomas Graves via acmurthy)
MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -384,6 +389,12 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3608. Fixed compile issue with MAPREDUCE-3522. (mahadev via
acmurthy)
MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce
ramp up. (Sharad Agarwal and Arun C Murthy via vinodkv)
MAPREDUCE-1744. DistributedCache creates its own FileSytem instance when
adding a file/archive to the path. (Dick King via tucu)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -858,8 +858,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
int sysMaxReduces = 1;
long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
// wrong; get FS from [File?]InputFormat and default block size from that
fs.getDefaultBlockSize()); // FIXME: this is wrong; get FS from
// [File?]InputFormat and default block size
// from that
long sysMemSizeForUberSlot =
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,

View File

@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
@ -122,8 +123,6 @@ public class RMContainerAllocator extends RMContainerRequestor
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
private int reduceResourceReqt;//memory
private int completedMaps = 0;
private int completedReduces = 0;
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
@ -169,7 +168,13 @@ public class RMContainerAllocator extends RMContainerRequestor
if (recalculateReduceSchedule) {
preemptReducesIfNeeded();
scheduleReduces();
scheduleReduces(
getJob().getTotalMaps(), getJob().getCompletedMaps(),
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}
}
@ -180,6 +185,14 @@ public class RMContainerAllocator extends RMContainerRequestor
LOG.info("Final Stats: " + getStat());
}
public boolean getIsReduceStarted() {
return reduceStarted;
}
public void setIsReduceStarted(boolean reduceStarted) {
this.reduceStarted = reduceStarted;
}
@SuppressWarnings("unchecked")
@Override
public synchronized void handle(ContainerAllocatorEvent event) {
@ -319,10 +332,17 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
}
private void scheduleReduces() {
@Private
public void scheduleReduces(
int totalMaps, int completedMaps,
int scheduledMaps, int scheduledReduces,
int assignedMaps, int assignedReduces,
int mapResourceReqt, int reduceResourceReqt,
int numPendingReduces,
float maxReduceRampupLimit, float reduceSlowStart) {
if (pendingReduces.size() == 0) {
if (numPendingReduces == 0) {
return;
}
@ -330,29 +350,25 @@ public class RMContainerAllocator extends RMContainerRequestor
//if all maps are assigned, then ramp up all reduces irrespective of the
//headroom
if (scheduledRequests.maps.size() == 0 && pendingReduces.size() > 0) {
LOG.info("All maps assigned. Ramping up all remaining reduces:" + pendingReduces.size());
for (ContainerRequest req : pendingReduces) {
scheduledRequests.addReduce(req);
}
pendingReduces.clear();
if (scheduledMaps == 0 && numPendingReduces > 0) {
LOG.info("All maps assigned. " +
"Ramping up all remaining reduces:" + numPendingReduces);
scheduleAllReduces();
return;
}
int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size();
//check for slow start
if (!reduceStarted) {//not set yet
if (!getIsReduceStarted()) {//not set yet
int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
totalMaps);
if(completedMaps < completedMapsForReduceSlowstart) {
LOG.info("Reduce slow start threshold not met. " +
"completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart);
"completedMapsForReduceSlowstart " +
completedMapsForReduceSlowstart);
return;
} else {
LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
reduceStarted = true;
setIsReduceStarted(true);
}
}
@ -363,20 +379,21 @@ public class RMContainerAllocator extends RMContainerRequestor
completedMapPercent = 1;
}
int netScheduledMapMem = scheduledRequests.maps.size() * mapResourceReqt
+ assignedRequests.maps.size() * mapResourceReqt;
int netScheduledMapMem =
(scheduledMaps + assignedMaps) * mapResourceReqt;
int netScheduledReduceMem = scheduledRequests.reduces.size()
* reduceResourceReqt + assignedRequests.reduces.size()
* reduceResourceReqt;
int netScheduledReduceMem =
(scheduledReduces + assignedReduces) * reduceResourceReqt;
int finalMapMemLimit = 0;
int finalReduceMemLimit = 0;
// ramp up the reduces based on completed map percentage
int totalMemLimit = getMemLimit();
int idealReduceMemLimit = Math.min((int)(completedMapPercent * totalMemLimit),
(int) (maxReduceRampupLimit * totalMemLimit));
int idealReduceMemLimit =
Math.min(
(int)(completedMapPercent * totalMemLimit),
(int) (maxReduceRampupLimit * totalMemLimit));
int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;
// check if there aren't enough maps scheduled, give the free map capacity
@ -397,29 +414,46 @@ public class RMContainerAllocator extends RMContainerRequestor
" netScheduledMapMem:" + netScheduledMapMem +
" netScheduledReduceMem:" + netScheduledReduceMem);
int rampUp = (finalReduceMemLimit - netScheduledReduceMem)
/ reduceResourceReqt;
int rampUp =
(finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
if (rampUp > 0) {
rampUp = Math.min(rampUp, pendingReduces.size());
rampUp = Math.min(rampUp, numPendingReduces);
LOG.info("Ramping up " + rampUp);
//more reduce to be scheduled
for (int i = 0; i < rampUp; i++) {
ContainerRequest request = pendingReduces.removeFirst();
scheduledRequests.addReduce(request);
}
rampUpReduces(rampUp);
} else if (rampUp < 0){
int rampDown = -1 * rampUp;
rampDown = Math.min(rampDown, scheduledRequests.reduces.size());
rampDown = Math.min(rampDown, scheduledReduces);
LOG.info("Ramping down " + rampDown);
//remove from the scheduled and move back to pending
for (int i = 0; i < rampDown; i++) {
ContainerRequest request = scheduledRequests.removeReduce();
pendingReduces.add(request);
}
rampDownReduces(rampDown);
}
}
private void scheduleAllReduces() {
for (ContainerRequest req : pendingReduces) {
scheduledRequests.addReduce(req);
}
pendingReduces.clear();
}
@Private
public void rampUpReduces(int rampUp) {
//more reduce to be scheduled
for (int i = 0; i < rampUp; i++) {
ContainerRequest request = pendingReduces.removeFirst();
scheduledRequests.addReduce(request);
}
}
@Private
public void rampDownReduces(int rampDown) {
//remove from the scheduled and move back to pending
for (int i = 0; i < rampDown; i++) {
ContainerRequest request = scheduledRequests.removeReduce();
pendingReduces.add(request);
}
}
/**
* Synchronized to avoid findbugs warnings
*/
@ -429,8 +463,8 @@ public class RMContainerAllocator extends RMContainerRequestor
" ScheduledReduces:" + scheduledRequests.reduces.size() +
" AssignedMaps:" + assignedRequests.maps.size() +
" AssignedReduces:" + assignedRequests.reduces.size() +
" completedMaps:" + completedMaps +
" completedReduces:" + completedReduces +
" completedMaps:" + getJob().getCompletedMaps() +
" completedReduces:" + getJob().getCompletedReduces() +
" containersAllocated:" + containersAllocated +
" containersReleased:" + containersReleased +
" hostLocalAssigned:" + hostLocalAssigned +
@ -497,11 +531,7 @@ public class RMContainerAllocator extends RMContainerRequestor
+ cont.getContainerId());
} else {
assignedRequests.remove(attemptID);
if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) {
completedMaps++;
} else {
completedReduces++;
}
// send the container completed event to Task attempt
eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
@ -514,7 +544,8 @@ public class RMContainerAllocator extends RMContainerRequestor
return newContainers;
}
private int getMemLimit() {
@Private
public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
return headRoom + assignedRequests.maps.size() * mapResourceReqt +
assignedRequests.reduces.size() * reduceResourceReqt;

View File

@ -19,8 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
@ -1218,6 +1217,70 @@ public class TestRMContainerAllocator {
}
@Test
public void testReduceScheduling() throws Exception {
int totalMaps = 10;
int succeededMaps = 1;
int scheduledMaps = 10;
int scheduledReduces = 0;
int assignedMaps = 2;
int assignedReduces = 0;
int mapResourceReqt = 1024;
int reduceResourceReqt = 2*1024;
int numPendingReduces = 4;
float maxReduceRampupLimit = 0.5f;
float reduceSlowStart = 0.2f;
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
doCallRealMethod().when(allocator).
scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(),
anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
// Test slow-start
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, never()).setIsReduceStarted(true);
succeededMaps = 3;
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator, times(1)).setIsReduceStarted(true);
// Test reduce ramp-up
doReturn(100 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampUpReduces(anyInt());
verify(allocator, never()).rampDownReduces(anyInt());
// Test reduce ramp-down
scheduledReduces = 3;
doReturn(10 * 1024).when(allocator).getMemLimit();
allocator.scheduleReduces(
totalMaps, succeededMaps,
scheduledMaps, scheduledReduces,
assignedMaps, assignedReduces,
mapResourceReqt, reduceResourceReqt,
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
}
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();

View File

@ -1030,7 +1030,7 @@ public class Job extends JobContextImpl implements JobContext {
public void addFileToClassPath(Path file)
throws IOException {
ensureState(JobState.DEFINE);
DistributedCache.addFileToClassPath(file, conf);
DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
}
/**
@ -1045,7 +1045,7 @@ public class Job extends JobContextImpl implements JobContext {
public void addArchiveToClassPath(Path archive)
throws IOException {
ensureState(JobState.DEFINE);
DistributedCache.addArchiveToClassPath(archive, conf);
DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
}
/**

View File

@ -269,7 +269,7 @@ public class DistributedCache {
/**
* Add an file path to the current set of classpath entries It adds the file
* to cache as well. Intended to be used by user code.
*
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @deprecated Use {@link Job#addFileToClassPath(Path)} instead
@ -277,12 +277,25 @@ public class DistributedCache {
@Deprecated
public static void addFileToClassPath(Path file, Configuration conf)
throws IOException {
addFileToClassPath(file, conf, file.getFileSystem(conf));
}
/**
* Add a file path to the current set of classpath entries. It adds the file
* to cache as well. Intended to be used by user code.
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
* @param fs FileSystem with respect to which {@code archivefile} should
* be interpreted.
*/
public static void addFileToClassPath
(Path file, Configuration conf, FileSystem fs)
throws IOException {
String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
: classpath + "," + file.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(file).toUri();
addCacheFile(uri, conf);
}
@ -318,10 +331,23 @@ public class DistributedCache {
@Deprecated
public static void addArchiveToClassPath(Path archive, Configuration conf)
throws IOException {
addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
}
/**
* Add an archive path to the current set of classpath entries. It adds the
* archive to cache as well. Intended to be used by user code.
*
* @param archive Path of the archive to be added
* @param conf Configuration that contains the classpath setting
* @param fs FileSystem with respect to which {@code archive} should be interpreted.
*/
public static void addArchiveToClassPath
(Path archive, Configuration conf, FileSystem fs)
throws IOException {
String classpath = conf.get(MRJobConfig.CLASSPATH_ARCHIVES);
conf.set(MRJobConfig.CLASSPATH_ARCHIVES, classpath == null ? archive
.toString() : classpath + "," + archive.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(archive).toUri();
addCacheArchive(uri, conf);

View File

@ -196,7 +196,7 @@ public class TestMROldApiJobs {
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf);
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);

View File

@ -105,6 +105,7 @@ public class TestCompressionEmulationUtils {
conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE,
wordSize);
conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
conf.set("mapreduce.job.hdfs-servers", "");
FileSystem lfs = FileSystem.getLocal(conf);
@ -192,6 +193,7 @@ public class TestCompressionEmulationUtils {
CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
conf.set("mapreduce.job.hdfs-servers", "");
float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
if (ratio > 0) {

View File

@ -141,6 +141,7 @@ public class TestDistCacheEmulation {
boolean useOldProperties) throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
conf.set(MRJobConfig.USER_NAME, user);
conf.set("mapreduce.job.hdfs-servers", "");
// Set some dummy dist cache files in gridmix configuration so that they go
// into the configuration of JobStory objects.
String[] distCacheFiles = {"hdfs:///tmp/file1.txt",

View File

@ -521,6 +521,7 @@ public class TestGridmixSubmission {
DebugGridmix client = new DebugGridmix();
conf = new Configuration();
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
conf.set("mapreduce.job.hdfs-servers", "");
if (useDefaultQueue) {
conf.setBoolean(GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, false);
conf.set(GridmixJob.GRIDMIX_DEFAULT_QUEUE, "q1");

View File

@ -205,6 +205,7 @@ public class TestSleepJob {
throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true);
conf.set("mapreduce.job.hdfs-servers", "");
DebugJobProducer jobProducer = new DebugJobProducer(5, conf);
JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
@ -253,6 +254,7 @@ public class TestSleepJob {
DebugGridmix client = new DebugGridmix();
conf = new Configuration();
conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy);
conf.set("mapreduce.job.hdfs-servers", "");
conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
// allow synthetic users to create home directories
GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777));

View File

@ -1144,7 +1144,7 @@ public class TestCombineFileInputFormat extends TestCase {
if (!(fs instanceof DistributedFileSystem)) {
throw new IOException("Wrong file system: " + fs.getClass().getName());
}
int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
long blockSize = fs.getDefaultBlockSize();
DummyInputFormat inFormat = new DummyInputFormat();
for (int i = 0; i < args.length; i++) {