Merge r1467713 through r1469041 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1469042 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-04-17 19:41:50 +00:00
commit 2186912ff9
28 changed files with 1060 additions and 510 deletions

View File

@ -641,7 +641,7 @@ Release 2.0.4-alpha - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-9467. Metrics2 record filter should check name as well as tags. HADOOP-9467. Metrics2 record filter should check name as well as tags.
(Ganeshan Iyler via llu) (Chris Nauroth and Ganeshan Iyler via llu)
HADOOP-9406. hadoop-client leaks dependency on JDK tools jar. (tucu) HADOOP-9406. hadoop-client leaks dependency on JDK tools jar. (tucu)

View File

@ -461,6 +461,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4679. Namenode operation checks should be done in a consistent HDFS-4679. Namenode operation checks should be done in a consistent
manner. (suresh) manner. (suresh)
HDFS-4693. Some test cases in TestCheckpoint do not clean up after
themselves. (Arpit Agarwal, suresh via suresh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -565,6 +568,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4639. startFileInternal() should not increment generation stamp. HDFS-4639. startFileInternal() should not increment generation stamp.
(Plamen Jeliazkov via shv) (Plamen Jeliazkov via shv)
HDFS-4695. TestEditLog leaks open file handles between tests.
(Ivan Mitic via suresh)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -290,6 +290,7 @@ public class SecondaryNameNode implements Runnable {
try { try {
infoServer.join(); infoServer.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.debug("Exception ", ie);
} }
} }
@ -309,15 +310,25 @@ public class SecondaryNameNode implements Runnable {
} }
} }
try { try {
if (infoServer != null) infoServer.stop(); if (infoServer != null) {
infoServer.stop();
infoServer = null;
}
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Exception shutting down SecondaryNameNode", e); LOG.warn("Exception shutting down SecondaryNameNode", e);
} }
try { try {
if (checkpointImage != null) checkpointImage.close(); if (checkpointImage != null) {
checkpointImage.close();
checkpointImage = null;
}
} catch(IOException e) { } catch(IOException e) {
LOG.warn("Exception while closing CheckpointStorage", e); LOG.warn("Exception while closing CheckpointStorage", e);
} }
if (namesystem != null) {
namesystem.shutdown();
namesystem = null;
}
} }
@Override @Override

View File

@ -109,6 +109,7 @@ public class UpgradeUtilities {
config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeStorage.toString()); config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeStorage.toString());
config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, datanodeStorage.toString()); config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, datanodeStorage.toString());
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
String bpid = null;
try { try {
// format data-node // format data-node
createEmptyDirs(new String[] {datanodeStorage.toString()}); createEmptyDirs(new String[] {datanodeStorage.toString()});
@ -149,6 +150,7 @@ public class UpgradeUtilities {
// write more files // write more files
writeFile(fs, new Path(baseDir, "file3"), buffer, bufferSize); writeFile(fs, new Path(baseDir, "file3"), buffer, bufferSize);
writeFile(fs, new Path(baseDir, "file4"), buffer, bufferSize); writeFile(fs, new Path(baseDir, "file4"), buffer, bufferSize);
bpid = cluster.getNamesystem(0).getBlockPoolId();
} finally { } finally {
// shutdown // shutdown
if (cluster != null) cluster.shutdown(); if (cluster != null) cluster.shutdown();
@ -160,7 +162,6 @@ public class UpgradeUtilities {
File dnCurDir = new File(datanodeStorage, "current"); File dnCurDir = new File(datanodeStorage, "current");
datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir); datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir);
String bpid = cluster.getNamesystem(0).getBlockPoolId();
File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current"); "current");
blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir); blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir);

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -634,6 +635,7 @@ public class TestEditLog {
// Now restore the backup // Now restore the backup
FileUtil.fullyDeleteContents(dfsDir); FileUtil.fullyDeleteContents(dfsDir);
dfsDir.delete();
backupDir.renameTo(dfsDir); backupDir.renameTo(dfsDir);
// Directory layout looks like: // Directory layout looks like:
@ -760,19 +762,24 @@ public class TestEditLog {
File log = new File(currentDir, File log = new File(currentDir,
NNStorage.getInProgressEditsFileName(3)); NNStorage.getInProgressEditsFileName(3));
new EditLogFileOutputStream(log, 1024).create(); EditLogFileOutputStream stream = new EditLogFileOutputStream(log, 1024);
if (!inBothDirs) { try {
break; stream.create();
} if (!inBothDirs) {
break;
}
NNStorage storage = new NNStorage(conf, NNStorage storage = new NNStorage(conf,
Collections.<URI>emptyList(), Collections.<URI>emptyList(),
Lists.newArrayList(uri)); Lists.newArrayList(uri));
if (updateTransactionIdFile) { if (updateTransactionIdFile) {
storage.writeTransactionIdFileToStorage(3); storage.writeTransactionIdFileToStorage(3);
}
storage.close();
} finally {
stream.close();
} }
storage.close();
} }
try { try {
@ -1335,12 +1342,15 @@ public class TestEditLog {
FSEditLog editlog = getFSEditLog(storage); FSEditLog editlog = getFSEditLog(storage);
editlog.initJournalsForWrite(); editlog.initJournalsForWrite();
long startTxId = 1; long startTxId = 1;
Collection<EditLogInputStream> streams = null;
try { try {
readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL), streams = editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL);
startTxId); readAllEdits(streams, startTxId);
} catch (IOException e) { } catch (IOException e) {
LOG.error("edit log failover didn't work", e); LOG.error("edit log failover didn't work", e);
fail("Edit log failover didn't work"); fail("Edit log failover didn't work");
} finally {
IOUtils.cleanup(null, streams.toArray(new EditLogInputStream[0]));
} }
} }
@ -1382,12 +1392,15 @@ public class TestEditLog {
FSEditLog editlog = getFSEditLog(storage); FSEditLog editlog = getFSEditLog(storage);
editlog.initJournalsForWrite(); editlog.initJournalsForWrite();
long startTxId = 1; long startTxId = 1;
Collection<EditLogInputStream> streams = null;
try { try {
readAllEdits(editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL), streams = editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL);
startTxId); readAllEdits(streams, startTxId);
} catch (IOException e) { } catch (IOException e) {
LOG.error("edit log failover didn't work", e); LOG.error("edit log failover didn't work", e);
fail("Edit log failover didn't work"); fail("Edit log failover didn't work");
} finally {
IOUtils.cleanup(null, streams.toArray(new EditLogInputStream[0]));
} }
} }

View File

@ -200,6 +200,9 @@ Release 2.0.5-beta - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
(Gelesh via bobby)
BUG FIXES BUG FIXES
MAPREDUCE-4671. AM does not tell the RM about container requests which are MAPREDUCE-4671. AM does not tell the RM about container requests which are
@ -299,6 +302,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5139. Update MR AM to use the modified startContainer API after MAPREDUCE-5139. Update MR AM to use the modified startContainer API after
YARN-486. (Xuan Gong via vinodkv) YARN-486. (Xuan Gong via vinodkv)
MAPREDUCE-5151. Update MR AM to use standard exit codes from the API after
YARN-444. (Sandy Ryza via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -851,10 +857,16 @@ Release 0.23.8 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-5065. DistCp should skip checksum comparisons if block-sizes
are different on source/target (Mithun Radhakrishnan via kihwal)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
MAPREDUCE-5015. Coverage fix for org.apache.hadoop.mapreduce.tools.CLI
(Aleksey Gorshkov via tgraves)
Release 0.23.7 - UNRELEASED Release 0.23.7 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -67,7 +68,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
@ -624,7 +624,7 @@ public class RMContainerAllocator extends RMContainerRequestor
@VisibleForTesting @VisibleForTesting
public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
TaskAttemptId attemptID) { TaskAttemptId attemptID) {
if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) { if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
// killed by framework // killed by framework
return new TaskAttemptEvent(attemptID, return new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_KILL); TaskAttemptEventType.TA_KILL);

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -83,7 +84,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
@ -1660,7 +1660,7 @@ public class TestRMContainerAllocator {
ContainerStatus abortedStatus = BuilderUtils.newContainerStatus( ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
containerId, ContainerState.RUNNING, "", containerId, ContainerState.RUNNING, "",
YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS); ContainerExitStatus.ABORTED);
TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
attemptId); attemptId);

View File

@ -52,7 +52,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
public static final String MAX_LINE_LENGTH = public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength"; "mapreduce.input.linerecordreader.line.maxlength";
private CompressionCodecFactory compressionCodecs = null;
private long start; private long start;
private long pos; private long pos;
private long end; private long end;
@ -60,9 +59,9 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
private FSDataInputStream fileIn; private FSDataInputStream fileIn;
private Seekable filePosition; private Seekable filePosition;
private int maxLineLength; private int maxLineLength;
private LongWritable key = null; private LongWritable key;
private Text value = null; private Text value;
private CompressionCodec codec; private boolean isCompressedInput;
private Decompressor decompressor; private Decompressor decompressor;
private byte[] recordDelimiterBytes; private byte[] recordDelimiterBytes;
@ -81,13 +80,14 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
start = split.getStart(); start = split.getStart();
end = start + split.getLength(); end = start + split.getLength();
final Path file = split.getPath(); final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split // open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job); final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file); fileIn = fs.open(file);
if (isCompressedInput()) {
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec); decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) { if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = final SplitCompressionInputStream cIn =
@ -132,19 +132,16 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
this.pos = start; this.pos = start;
} }
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) { private int maxBytesToConsume(long pos) {
return isCompressedInput() return isCompressedInput
? Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) Math.min(Integer.MAX_VALUE, end - pos); : (int) Math.min(Integer.MAX_VALUE, end - pos);
} }
private long getFilePosition() throws IOException { private long getFilePosition() throws IOException {
long retVal; long retVal;
if (isCompressedInput() && null != filePosition) { if (isCompressedInput && null != filePosition) {
retVal = filePosition.getPos(); retVal = filePosition.getPos();
} else { } else {
retVal = pos; retVal = pos;
@ -166,9 +163,6 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
while (getFilePosition() <= end) { while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength, newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength)); Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize; pos += newSize;
if (newSize < maxLineLength) { if (newSize < maxLineLength) {
break; break;

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.logaggregation.LogDumper; import org.apache.hadoop.yarn.logaggregation.LogDumper;
@ -64,8 +65,6 @@ import com.google.common.base.Charsets;
public class CLI extends Configured implements Tool { public class CLI extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CLI.class); private static final Log LOG = LogFactory.getLog(CLI.class);
protected Cluster cluster; protected Cluster cluster;
private final Set<String> taskTypes = new HashSet<String>(
Arrays.asList("map", "reduce", "setup", "cleanup"));
private final Set<String> taskStates = new HashSet<String>( private final Set<String> taskStates = new HashSet<String>(
Arrays.asList("pending", "running", "completed", "failed", "killed")); Arrays.asList("pending", "running", "completed", "failed", "killed"));
@ -317,6 +316,7 @@ public class CLI extends Configured implements Tool {
exitCode = 0; exitCode = 0;
} else if (displayTasks) { } else if (displayTasks) {
displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState); displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
exitCode = 0;
} else if(killTask) { } else if(killTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid); TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = cluster.getJob(taskID.getJobID()); Job job = cluster.getJob(taskID.getJobID());
@ -563,16 +563,18 @@ public class CLI extends Configured implements Tool {
*/ */
protected void displayTasks(Job job, String type, String state) protected void displayTasks(Job job, String type, String state)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (!taskTypes.contains(type)) {
throw new IllegalArgumentException("Invalid type: " + type +
". Valid types for task are: map, reduce, setup, cleanup.");
}
if (!taskStates.contains(state)) { if (!taskStates.contains(state)) {
throw new java.lang.IllegalArgumentException("Invalid state: " + state + throw new java.lang.IllegalArgumentException("Invalid state: " + state +
". Valid states for task are: pending, running, completed, failed, killed."); ". Valid states for task are: pending, running, completed, failed, killed.");
} }
TaskReport[] reports=null;
TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type)); try{
reports = job.getTaskReports(TaskType.valueOf(type));
}catch(IllegalArgumentException e){
throw new IllegalArgumentException("Invalid type: " + type +
". Valid types for task are: MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP.");
}
for (TaskReport report : reports) { for (TaskReport report : reports) {
TIPStatus status = report.getCurrentStatus(); TIPStatus status = report.getCurrentStatus();
if ((state.equals("pending") && status ==TIPStatus.PENDING) || if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
@ -626,6 +628,6 @@ public class CLI extends Configured implements Tool {
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
int res = ToolRunner.run(new CLI(), argv); int res = ToolRunner.run(new CLI(), argv);
System.exit(res); ExitUtil.terminate(res);
} }
} }

View File

@ -154,6 +154,7 @@
<configuration> <configuration>
<excludes> <excludes>
<exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude> <exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
<exclude>src/test/resources/job_1329348432655_0001-10.jhist</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.mapreduce; package org.apache.hadoop.mapreduce;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
@ -30,18 +32,20 @@ import java.io.PrintStream;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase; import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.tools.CLI; import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.junit.Ignore; /**
import org.junit.Test; test CLI class. CLI class implemented the Tool interface.
@Ignore Here test that CLI sends correct command with options and parameters.
*/
public class TestMRJobClient extends ClusterMapReduceTestCase { public class TestMRJobClient extends ClusterMapReduceTestCase {
private static final Log LOG = LogFactory.getLog(TestMRJobClient.class); private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);
@ -49,16 +53,16 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
private Job runJob(Configuration conf) throws Exception { private Job runJob(Configuration conf) throws Exception {
String input = "hello1\nhello2\nhello3\n"; String input = "hello1\nhello2\nhello3\n";
Job job = MapReduceTestUtil.createJob(conf, Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
getInputDir(), getOutputDir(), 1, 1, input); 1, 1, input);
job.setJobName("mr"); job.setJobName("mr");
job.setPriority(JobPriority.HIGH); job.setPriority(JobPriority.NORMAL);
job.waitForCompletion(true); job.waitForCompletion(true);
return job; return job;
} }
public static int runTool(Configuration conf, Tool tool, public static int runTool(Configuration conf, Tool tool, String[] args,
String[] args, OutputStream out) throws Exception { OutputStream out) throws Exception {
PrintStream oldOut = System.out; PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true); PrintStream newOut = new PrintStream(out, true);
try { try {
@ -69,20 +73,17 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
} }
} }
private static class BadOutputFormat private static class BadOutputFormat extends TextOutputFormat<Object, Object> {
extends TextOutputFormat {
@Override @Override
public void checkOutputSpecs(JobContext job) public void checkOutputSpecs(JobContext job) throws IOException {
throws FileAlreadyExistsException, IOException {
throw new IOException(); throw new IOException();
} }
} }
@Test
public void testJobSubmissionSpecsAndFiles() throws Exception { public void testJobSubmissionSpecsAndFiles() throws Exception {
Configuration conf = createJobConf(); Configuration conf = createJobConf();
Job job = MapReduceTestUtil.createJob(conf, Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
getInputDir(), getOutputDir(), 1, 1); 1, 1);
job.setOutputFormatClass(BadOutputFormat.class); job.setOutputFormatClass(BadOutputFormat.class);
try { try {
job.submit(); job.submit();
@ -90,60 +91,392 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
} catch (Exception e) { } catch (Exception e) {
assertTrue(e instanceof IOException); assertTrue(e instanceof IOException);
} }
JobID jobId = job.getJobID();
Cluster cluster = new Cluster(conf); Cluster cluster = new Cluster(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir( Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
cluster,
job.getConfiguration()); job.getConfiguration());
Path submitJobDir = new Path(jobStagingArea, jobId.toString()); Path submitJobDir = new Path(jobStagingArea, "JobId");
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
assertFalse( assertFalse("Shouldn't have created a job file if job specs failed.",
"Shouldn't have created a job file if job specs failed.", FileSystem.get(conf).exists(submitJobFile));
FileSystem.get(conf).exists(submitJobFile)
);
} }
@Test /**
* main test method
*/
public void testJobClient() throws Exception { public void testJobClient() throws Exception {
Configuration conf = createJobConf(); Configuration conf = createJobConf();
Job job = runJob(conf); Job job = runJob(conf);
String jobId = job.getJobID().toString(); String jobId = job.getJobID().toString();
testGetCounter(jobId, conf); // test jobs list
testJobList(jobId, conf); testJobList(jobId, conf);
// test job counter
testGetCounter(jobId, conf);
// status
testJobStatus(jobId, conf);
// test list of events
testJobEvents(jobId, conf);
// test job history
testJobHistory(conf);
// test tracker list
testListTrackers(conf);
// attempts list
testListAttemptIds(jobId, conf);
// black list
testListBlackList(conf);
// test method main and help screen
startStop();
// test a change job priority .
testChangingJobPriority(jobId, conf); testChangingJobPriority(jobId, conf);
// submit job from file
testSubmit(conf);
// kill a task
testKillTask(job, conf);
// fail a task
testfailTask(job, conf);
// kill job
testKillJob(jobId, conf);
} }
@Test /**
public void testGetCounter(String jobId, * test fail task
Configuration conf) throws Exception { */
private void testfailTask(Job job, Configuration conf) throws Exception {
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
// TaskAttemptId is not set
int exitCode = runTool(conf, jc, new String[] { "-fail-task" }, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[] { "-fail-task", taid.toString() }, out);
fail(" this task should field");
} catch (YarnRemoteException e) {
// task completed !
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
}
/**
* test a kill task
*/
private void testKillTask(Job job, Configuration conf) throws Exception {
CLI jc = createJobClient();
TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
TaskAttemptID taid = new TaskAttemptID(tid, 1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad parameters
int exitCode = runTool(conf, jc, new String[] { "-kill-task" }, out);
assertEquals("Exit code", -1, exitCode);
try {
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
fail(" this task should be killed");
} catch (YarnRemoteException e) {
// task completed
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}
}
/**
* test a kill job
*/
private void testKillJob(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// without jobId
int exitCode = runTool(conf, jc, new String[] { "-kill" }, out);
assertEquals("Exit code", -1, exitCode);
// good parameters
exitCode = runTool(conf, jc, new String[] { "-kill", jobId }, out);
assertEquals("Exit code", 0, exitCode);
String answer = new String(out.toByteArray(), "UTF-8");
assertTrue(answer.contains("Killed job " + jobId));
}
/**
* test submit task from file
*/
private void testSubmit(Configuration conf) throws Exception {
CLI jc = createJobClient();
Job job = MapReduceTestUtil.createJob(conf, getInputDir(), getOutputDir(),
1, 1, "ping");
job.setJobName("mr");
job.setPriority(JobPriority.NORMAL);
File fcon = File.createTempFile("config", ".xml");
job.getConfiguration().writeXml(new FileOutputStream(fcon));
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad parameters
int exitCode = runTool(conf, jc, new String[] { "-submit" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc,
new String[] { "-submit", "file://" + fcon.getAbsolutePath() }, out);
assertEquals("Exit code", 0, exitCode);
String answer = new String(out.toByteArray());
// in console was written
assertTrue(answer.contains("Created job "));
}
/**
* test start form console command without options
*/
private void startStop() {
ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintStream error = System.err;
System.setErr(new PrintStream(data));
ExitUtil.disableSystemExit();
try {
CLI.main(new String[0]);
fail(" CLI.main should call System.exit");
} catch (ExitUtil.ExitException e) {
ExitUtil.resetFirstExitException();
assertEquals(-1, e.status);
} catch (Exception e) {
} finally {
System.setErr(error);
}
// in console should be written help text
String s = new String(data.toByteArray());
assertTrue(s.contains("-submit"));
assertTrue(s.contains("-status"));
assertTrue(s.contains("-kill"));
assertTrue(s.contains("-set-priority"));
assertTrue(s.contains("-events"));
assertTrue(s.contains("-history"));
assertTrue(s.contains("-list"));
assertTrue(s.contains("-list-active-trackers"));
assertTrue(s.contains("-list-blacklisted-trackers"));
assertTrue(s.contains("-list-attempt-ids"));
assertTrue(s.contains("-kill-task"));
assertTrue(s.contains("-fail-task"));
assertTrue(s.contains("-logs"));
}
/**
* black list
*/
private void testListBlackList(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] {
"-list-blacklisted-trackers", "second in" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-blacklisted-trackers" },
out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(0, counter);
}
/**
* print AttemptIds list
*/
private void testListAttemptIds(String jobId, Configuration conf)
throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-attempt-ids", jobId,
"MAP", "completed" }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(1, counter);
}
/**
* print tracker list
*/
private void testListTrackers(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-list-active-trackers",
"second parameter" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-list-active-trackers" }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
counter++;
}
assertEquals(2, counter);
}
/**
* print job history from file
*/
private void testJobHistory(Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
// bad command
int exitCode = runTool(conf, jc, new String[] { "-history", "pul",
"file://" + f.getAbsolutePath() }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-history", "all",
"file://" + f.getAbsolutePath() }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.startsWith("task_")) {
counter++;
}
}
assertEquals(23, counter);
}
/**
* print job events list
*/
private void testJobEvents(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
int exitCode = runTool(conf, jc, new String[] { "-events" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-events", jobId, "0", "100" },
out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
int counter = 0;
String attemptId = ("attempt" + jobId.substring(3));
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(attemptId)) {
counter++;
}
}
assertEquals(2, counter);
}
/**
* print job status
*/
private void testJobStatus(String jobId, Configuration conf) throws Exception {
CLI jc = createJobClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
int exitCode = runTool(conf, jc, new String[] { "-status" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, jc, new String[] { "-status", jobId }, out);
assertEquals("Exit code", 0, exitCode);
String line;
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (!line.contains("Job state:")) {
continue;
}
break;
}
assertNotNull(line);
assertTrue(line.contains("SUCCEEDED"));
}
/**
* print counters
*/
public void testGetCounter(String jobId, Configuration conf) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad command
int exitCode = runTool(conf, createJobClient(), int exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(),
new String[] { "-counter", jobId, new String[] { "-counter", jobId,
"org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" }, "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS" },
out); out);
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
assertEquals("Counter", "3", out.toString().trim()); assertEquals("Counter", "3", out.toString().trim());
} }
/**
* print a job list
*/
protected void testJobList(String jobId, Configuration conf) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
// bad options
int exitCode = runTool(conf, createJobClient(), new String[] { "-list",
"alldata" }, out);
assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(),
// all jobs
new String[] { "-list", "all" }, out);
assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(
new ByteArrayInputStream(out.toByteArray())));
String line;
int counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(jobId)) {
counter++;
}
}
assertEquals(1, counter);
out.reset();
// only submitted
exitCode = runTool(conf, createJobClient(), new String[] { "-list" }, out);
assertEquals("Exit code", 0, exitCode);
br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
out.toByteArray())));
counter = 0;
while ((line = br.readLine()) != null) {
LOG.info("line = " + line);
if (line.contains(jobId)) {
counter++;
}
}
// all jobs submitted! no current
assertEquals(1, counter);
@Test
public void testJobList(String jobId,
Configuration conf) throws Exception {
verifyJobPriority(jobId, "HIGH", conf, createJobClient());
} }
protected void verifyJobPriority(String jobId, String priority, protected void verifyJobPriority(String jobId, String priority,
Configuration conf, CLI jc) throws Exception { Configuration conf, CLI jc) throws Exception {
PipedInputStream pis = new PipedInputStream(); PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis); PipedOutputStream pos = new PipedOutputStream(pis);
int exitCode = runTool(conf, jc, int exitCode = runTool(conf, jc, new String[] { "-list", "all" }, pos);
new String[] { "-list", "all" },
pos);
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
BufferedReader br = new BufferedReader(new InputStreamReader(pis)); BufferedReader br = new BufferedReader(new InputStreamReader(pis));
String line = null; String line;
while ((line = br.readLine()) != null) { while ((line = br.readLine()) != null) {
LOG.info("line = " + line); LOG.info("line = " + line);
if (!line.startsWith(jobId)) { if (!line.contains(jobId)) {
continue; continue;
} }
assertTrue(line.contains(priority)); assertTrue(line.contains(priority));
@ -152,63 +485,16 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
pis.close(); pis.close();
} }
@Test
public void testChangingJobPriority(String jobId, Configuration conf) public void testChangingJobPriority(String jobId, Configuration conf)
throws Exception { throws Exception {
int exitCode = runTool(conf, createJobClient(), int exitCode = runTool(conf, createJobClient(),
new String[] { "-set-priority", jobId, "VERY_LOW" }, new String[] { "-set-priority" }, new ByteArrayOutputStream());
new ByteArrayOutputStream()); assertEquals("Exit code", -1, exitCode);
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode); assertEquals("Exit code", 0, exitCode);
verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient()); // because this method does not implemented still.
} verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
@Test
public void testMissingProfileOutput() throws Exception {
Configuration conf = createJobConf();
final String input = "hello1\n";
// Set a job to be profiled with an empty agentlib parameter.
// This will fail to create profile.out files for tasks.
// This will succeed by skipping the HTTP fetch of the
// profiler output.
Job job = MapReduceTestUtil.createJob(conf,
getInputDir(), getOutputDir(), 1, 1, input);
job.setJobName("disable-profile-fetch");
job.setProfileEnabled(true);
job.setProfileParams("-agentlib:,verbose=n,file=%s");
job.setMaxMapAttempts(1);
job.setMaxReduceAttempts(1);
job.setJobSetupCleanupNeeded(false);
job.waitForCompletion(true);
// Run another job with an hprof agentlib param; verify
// that the HTTP fetch works here.
Job job2 = MapReduceTestUtil.createJob(conf,
getInputDir(), getOutputDir(), 1, 1, input);
job2.setJobName("enable-profile-fetch");
job2.setProfileEnabled(true);
job2.setProfileParams(
"-agentlib:hprof=cpu=samples,heap=sites,force=n,"
+ "thread=y,verbose=n,file=%s");
job2.setProfileTaskRange(true, "0-1");
job2.setProfileTaskRange(false, "");
job2.setMaxMapAttempts(1);
job2.setMaxReduceAttempts(1);
job2.setJobSetupCleanupNeeded(false);
job2.waitForCompletion(true);
// Find the first map task, verify that we got its profile output file.
TaskReport [] reports = job2.getTaskReports(TaskType.MAP);
assertTrue("No task reports found!", reports.length > 0);
TaskReport report = reports[0];
TaskID id = report.getTaskId();
assertTrue(TaskType.MAP == id.getTaskType());
System.out.println("Using task id: " + id);
TaskAttemptID attemptId = new TaskAttemptID(id, 0);
File profileOutFile = new File(attemptId.toString() + ".profile");
assertTrue("Couldn't find profiler output", profileOutFile.exists());
assertTrue("Couldn't remove profiler output", profileOutFile.delete());
} }
protected CLI createJobClient() throws IOException { protected CLI createJobClient() throws IOException {

View File

@ -140,10 +140,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
private void compareCheckSums(FileSystem sourceFS, Path source, private void compareCheckSums(FileSystem sourceFS, Path source,
FileSystem targetFS, Path target) FileSystem targetFS, Path target)
throws IOException { throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) {
throw new IOException("Check-sum mismatch between " StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
+ source + " and " + target); .append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.")
.append(" Use -pb to preserve block-sizes during copy.")
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)");
}
throw new IOException(errorMessage.toString());
}
} }
//If target file exists and unable to delete target - fail //If target file exists and unable to delete target - fail

View File

@ -53,7 +53,7 @@ public class TestCopyMapper {
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
private static List<Path> pathList = new ArrayList<Path>(); private static List<Path> pathList = new ArrayList<Path>();
private static int nFiles = 0; private static int nFiles = 0;
private static final int FILE_SIZE = 1024; private static final int DEFAULT_FILE_SIZE = 1024;
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
@ -92,7 +92,7 @@ public class TestCopyMapper {
configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), configuration.setBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(),
false); false);
configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), configuration.setBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(),
true); false);
configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), configuration.setBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(),
true); true);
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
@ -112,6 +112,18 @@ public class TestCopyMapper {
touchFile(SOURCE_PATH + "/7/8/9"); touchFile(SOURCE_PATH + "/7/8/9");
} }
private static void createSourceDataWithDifferentBlockSize() throws Exception {
mkdirs(SOURCE_PATH + "/1");
mkdirs(SOURCE_PATH + "/2");
mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5");
touchFile(SOURCE_PATH + "/5/6", true);
mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9");
}
private static void mkdirs(String path) throws Exception { private static void mkdirs(String path) throws Exception {
FileSystem fileSystem = cluster.getFileSystem(); FileSystem fileSystem = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(), final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
@ -121,17 +133,31 @@ public class TestCopyMapper {
} }
private static void touchFile(String path) throws Exception { private static void touchFile(String path) throws Exception {
touchFile(path, false);
}
private static void touchFile(String path, boolean createMultipleBlocks) throws Exception {
final long NON_DEFAULT_BLOCK_SIZE = 4096;
FileSystem fs; FileSystem fs;
DataOutputStream outputStream = null; DataOutputStream outputStream = null;
try { try {
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(), final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory()); fs.getWorkingDirectory());
final long blockSize = fs.getDefaultBlockSize(qualifiedPath) * 2; final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2;
outputStream = fs.create(qualifiedPath, true, 0, outputStream = fs.create(qualifiedPath, true, 0,
(short)(fs.getDefaultReplication(qualifiedPath)*2), (short)(fs.getDefaultReplication(qualifiedPath)*2),
blockSize); blockSize);
outputStream.write(new byte[FILE_SIZE]); byte[] bytes = new byte[DEFAULT_FILE_SIZE];
outputStream.write(bytes);
long fileSize = DEFAULT_FILE_SIZE;
if (createMultipleBlocks) {
while (fileSize < 2*blockSize) {
outputStream.write(bytes);
outputStream.flush();
fileSize += DEFAULT_FILE_SIZE;
}
}
pathList.add(qualifiedPath); pathList.add(qualifiedPath);
++nFiles; ++nFiles;
@ -144,7 +170,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testRun() { public void testRun() {
try { try {
deleteState(); deleteState();
@ -179,7 +205,7 @@ public class TestCopyMapper {
Assert.assertEquals(pathList.size(), Assert.assertEquals(pathList.size(),
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue()); stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
Assert.assertEquals(nFiles * FILE_SIZE, Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE,
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
testCopyingExistingFiles(fs, copyMapper, context); testCopyingExistingFiles(fs, copyMapper, context);
@ -211,7 +237,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testMakeDirFailure() { public void testMakeDirFailure() {
try { try {
deleteState(); deleteState();
@ -239,13 +265,13 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testIgnoreFailures() { public void testIgnoreFailures() {
doTestIgnoreFailures(true); doTestIgnoreFailures(true);
doTestIgnoreFailures(false); doTestIgnoreFailures(false);
} }
@Test @Test(timeout=40000)
public void testDirToFile() { public void testDirToFile() {
try { try {
deleteState(); deleteState();
@ -273,7 +299,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testPreserve() { public void testPreserve() {
try { try {
deleteState(); deleteState();
@ -343,7 +369,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testCopyReadableFiles() { public void testCopyReadableFiles() {
try { try {
deleteState(); deleteState();
@ -406,7 +432,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testSkipCopyNoPerms() { public void testSkipCopyNoPerms() {
try { try {
deleteState(); deleteState();
@ -480,7 +506,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testFailCopyWithAccessControlException() { public void testFailCopyWithAccessControlException() {
try { try {
deleteState(); deleteState();
@ -563,7 +589,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testFileToDir() { public void testFileToDir() {
try { try {
deleteState(); deleteState();
@ -640,12 +666,48 @@ public class TestCopyMapper {
cluster.getFileSystem().delete(new Path(TARGET_PATH), true); cluster.getFileSystem().delete(new Path(TARGET_PATH), true);
} }
@Test @Test(timeout=40000)
public void testPreserveBlockSizeAndReplication() { public void testPreserveBlockSizeAndReplication() {
testPreserveBlockSizeAndReplicationImpl(true); testPreserveBlockSizeAndReplicationImpl(true);
testPreserveBlockSizeAndReplicationImpl(false); testPreserveBlockSizeAndReplicationImpl(false);
} }
@Test(timeout=40000)
public void testCopyFailOnBlockSizeDifference() {
try {
deleteState();
createSourceDataWithDifferentBlockSize();
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= EnumSet.noneOf(DistCpOptions.FileAttribute.class);
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
DistCpUtils.packAttributes(fileAttributes));
copyMapper.setup(context);
for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
fileStatus, context);
}
Assert.fail("Copy should have failed because of block-size difference.");
}
catch (Exception exception) {
// Check that the exception suggests the use of -pb/-skipCrc.
Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb"));
Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc"));
}
}
private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){ private void testPreserveBlockSizeAndReplicationImpl(boolean preserve){
try { try {
@ -717,7 +779,7 @@ public class TestCopyMapper {
* If a single file is being copied to a location where the file (of the same * If a single file is being copied to a location where the file (of the same
* name) already exists, then the file shouldn't be skipped. * name) already exists, then the file shouldn't be skipped.
*/ */
@Test @Test(timeout=40000)
public void testSingleFileCopy() { public void testSingleFileCopy() {
try { try {
deleteState(); deleteState();
@ -766,7 +828,7 @@ public class TestCopyMapper {
} }
} }
@Test @Test(timeout=40000)
public void testPreserveUserGroup() { public void testPreserveUserGroup() {
testPreserveUserGroupImpl(true); testPreserveUserGroupImpl(true);
testPreserveUserGroupImpl(false); testPreserveUserGroupImpl(false);

View File

@ -82,6 +82,9 @@ Release 2.0.5-beta - UNRELEASED
RM as a direct parameter instead of as part of the ContainerLaunchContext RM as a direct parameter instead of as part of the ContainerLaunchContext
record. (Xuan Gong via vinodkv) record. (Xuan Gong via vinodkv)
YARN-444. Moved special container exit codes from YarnConfiguration to API
where they belong. (Sandy Ryza via vinodkv)
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS
@ -235,6 +238,9 @@ Release 2.0.5-beta - UNRELEASED
than its host:port during scheduling which caused incorrect locality for than its host:port during scheduling which caused incorrect locality for
containers. (Roger Hoover via acmurthy) containers. (Roger Hoover via acmurthy)
YARN-500. Fixed YARN webapps to not roll-over ports when explicitly asked
to use non-ephemeral ports. (Kenji Kikushima via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* Container exit statuses indicating special exit circumstances.
*/
@Public
@Evolving
public class ContainerExitStatus {
public static final int SUCCESS = 0;
public static final int INVALID = -1000;
/**
* Containers killed by the framework, either due to being released by
* the application or being 'lost' due to node failures etc.
*/
public static final int ABORTED = -100;
/**
* When threshold number of the nodemanager-local-directories or
* threshold number of the nodemanager-log-directories become bad.
*/
public static final int DISKS_FAILED = -101;
}

View File

@ -68,16 +68,17 @@ public interface ContainerStatus {
* *
* <p>Note: This is valid only for completed containers i.e. containers * <p>Note: This is valid only for completed containers i.e. containers
* with state {@link ContainerState#COMPLETE}. * with state {@link ContainerState#COMPLETE}.
* Otherwise, it returns an invalid exit code equal to {@literal -1000};</p> * Otherwise, it returns an ContainerExitStatus.INVALID.
* </p>
* *
* <p>Container killed by the framework, either due to being released by * <p>Containers killed by the framework, either due to being released by
* the application or being 'lost' due to node failures etc. have a special * the application or being 'lost' due to node failures etc. have a special
* exit code of {@literal -100}.</p> * exit code of ContainerExitStatus.ABORTED.</p>
* *
* <p>When threshold number of the nodemanager-local-directories or * <p>When threshold number of the nodemanager-local-directories or
* threshold number of the nodemanager-log-directories become bad, then * threshold number of the nodemanager-log-directories become bad, then
* container is not launched and is exited with exit status of * container is not launched and is exited with ContainersExitStatus.DISKS_FAILED.
* {@literal -101}.</p> * </p>
* *
* @return <em>exit status</em> for the container * @return <em>exit status</em> for the container
*/ */

View File

@ -280,7 +280,12 @@ message ContainerStatusProto {
optional int32 exit_status = 4 [default = -1000]; optional int32 exit_status = 4 [default = -1000];
} }
enum ContainerExitStatusProto {
SUCCESS = 0;
INVALID = -1000;
ABORTED = -100;
DISKS_FAILED = -101;
}
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
////// From common////////////////////////////////////////////////////// ////// From common//////////////////////////////////////////////////////

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -556,7 +557,7 @@ public class ApplicationMaster {
int exitStatus = containerStatus.getExitStatus(); int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) { if (0 != exitStatus) {
// container failed // container failed
if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) { if (ContainerExitStatus.ABORTED != exitStatus) {
// shell script failed // shell script failed
// counts as completed // counts as completed
numCompletedContainers.incrementAndGet(); numCompletedContainers.incrementAndGet();

View File

@ -583,11 +583,6 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_USER_HOME_DIR= "/home/"; public static final String DEFAULT_NM_USER_HOME_DIR= "/home/";
public static final int INVALID_CONTAINER_EXIT_STATUS = -1000;
public static final int ABORTED_CONTAINER_EXIT_STATUS = -100;
public static final int DISKS_FAILED = -101;
//////////////////////////////// ////////////////////////////////
// Web Proxy Configs // Web Proxy Configs
//////////////////////////////// ////////////////////////////////

View File

@ -97,13 +97,14 @@ public class WebApps {
public Builder<T> at(String bindAddress) { public Builder<T> at(String bindAddress) {
String[] parts = StringUtils.split(bindAddress, ':'); String[] parts = StringUtils.split(bindAddress, ':');
if (parts.length == 2) { if (parts.length == 2) {
return at(parts[0], Integer.parseInt(parts[1]), true); int port = Integer.parseInt(parts[1]);
return at(parts[0], port, port == 0);
} }
return at(bindAddress, 0, true); return at(bindAddress, 0, true);
} }
public Builder<T> at(int port) { public Builder<T> at(int port) {
return at("0.0.0.0", port, false); return at("0.0.0.0", port, port == 0);
} }
public Builder<T> at(String address, int port, boolean findPort) { public Builder<T> at(String address, int port, boolean findPort) {

View File

@ -161,6 +161,30 @@ public class TestWebApp {
app.stop(); app.stop();
} }
@Test(expected=org.apache.hadoop.yarn.webapp.WebAppException.class)
public void testCreateWithBindAddressNonZeroPort() {
WebApp app = WebApps.$for(this).at("0.0.0.0:50000").start();
int port = app.getListenerAddress().getPort();
assertEquals(50000, port);
// start another WebApp with same NonZero port
WebApp app2 = WebApps.$for(this).at("0.0.0.0:50000").start();
// An exception occurs (findPort disabled)
app.stop();
app2.stop();
}
@Test(expected=org.apache.hadoop.yarn.webapp.WebAppException.class)
public void testCreateWithNonZeroPort() {
WebApp app = WebApps.$for(this).at(50000).start();
int port = app.getListenerAddress().getPort();
assertEquals(50000, port);
// start another WebApp with same NonZero port
WebApp app2 = WebApps.$for(this).at(50000).start();
// An exception occurs (findPort disabled)
app.stop();
app2.stop();
}
@Test public void testServePaths() { @Test public void testServePaths() {
WebApp app = WebApps.$for("test", this).start(); WebApp app = WebApps.$for("test", this).start();
assertEquals("/test", app.getRedirectPath()); assertEquals("/test", app.getRedirectPath());

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -78,7 +79,7 @@ public class ContainerImpl implements Container {
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
private final ContainerLaunchContext launchContext; private final ContainerLaunchContext launchContext;
private final org.apache.hadoop.yarn.api.records.Container container; private final org.apache.hadoop.yarn.api.records.Container container;
private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS; private int exitCode = ContainerExitStatus.INVALID;
private final StringBuilder diagnostics; private final StringBuilder diagnostics;
/** The NM-wide configuration - not specific to this container */ /** The NM-wide configuration - not specific to this container */

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -185,7 +186,7 @@ public class ContainerLaunch implements Callable<Integer> {
List<String> logDirs = dirsHandler.getLogDirs(); List<String> logDirs = dirsHandler.getLogDirs();
if (!dirsHandler.areDisksHealthy()) { if (!dirsHandler.areDisksHealthy()) {
ret = YarnConfiguration.DISKS_FAILED; ret = ContainerExitStatus.DISKS_FAILED;
throw new IOException("Most of the disks failed. " throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport()); + dirsHandler.getDisksHealthReport());
} }

View File

@ -26,6 +26,7 @@ import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -63,8 +64,9 @@ public class ContainerInfo {
this.nodeId = nmContext.getNodeId().toString(); this.nodeId = nmContext.getNodeId().toString();
ContainerStatus containerData = container.cloneAndGetContainerStatus(); ContainerStatus containerData = container.cloneAndGetContainerStatus();
this.exitCode = containerData.getExitStatus(); this.exitCode = containerData.getExitStatus();
this.exitStatus = (this.exitCode == YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS) ? "N/A" this.exitStatus =
: String.valueOf(exitCode); (this.exitCode == ContainerExitStatus.INVALID) ?
"N/A" : String.valueOf(exitCode);
this.state = container.getContainerState().toString(); this.state = container.getContainerState().toString();
this.diagnostics = containerData.getDiagnostics(); this.diagnostics = containerData.getDiagnostics();
if (this.diagnostics == null || this.diagnostics.isEmpty()) { if (this.diagnostics == null || this.diagnostics.isEmpty()) {

View File

@ -21,12 +21,12 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
@ -76,7 +76,7 @@ public class SchedulerUtils {
containerStatus.setContainerId(containerId); containerStatus.setContainerId(containerId);
containerStatus.setDiagnostics(diagnostics); containerStatus.setDiagnostics(diagnostics);
containerStatus.setExitStatus( containerStatus.setExitStatus(
YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS); ContainerExitStatus.ABORTED);
containerStatus.setState(ContainerState.COMPLETE); containerStatus.setState(ContainerState.COMPLETE);
return containerStatus; return containerStatus;
} }