svn merge -c 1461146 FIXES: MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred (Aleksey Gorshkov via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461149 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-03-26 14:17:59 +00:00
parent 17a304973c
commit 4bc46b766d
22 changed files with 1529 additions and 290 deletions

View File

@ -17,6 +17,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-4990. Construct debug strings conditionally in MAPREDUCE-4990. Construct debug strings conditionally in
ShuffleHandler.Shuffle#sendMapOutput(). (kkambatl via tucu) ShuffleHandler.Shuffle#sendMapOutput(). (kkambatl via tucu)
MAPREDUCE-4875. coverage fixing for org.apache.hadoop.mapred
(Aleksey Gorshkov via bobby)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -32,6 +32,8 @@ import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
@ -78,7 +80,7 @@ public class TestTaskAttemptListenerImpl {
} }
} }
@Test @Test (timeout=5000)
public void testGetTask() throws IOException { public void testGetTask() throws IOException {
AppContext appCtx = mock(AppContext.class); AppContext appCtx = mock(AppContext.class);
JobTokenSecretManager secret = mock(JobTokenSecretManager.class); JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
@ -136,9 +138,30 @@ public class TestTaskAttemptListenerImpl {
assertTrue(result.shouldDie); assertTrue(result.shouldDie);
listener.stop(); listener.stop();
// test JVMID
JVMId jvmid = JVMId.forName("jvm_001_002_m_004");
assertNotNull(jvmid);
try {
JVMId.forName("jvm_001_002_m_004_006");
Assert.fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
"TaskId string : jvm_001_002_m_004_006 is not properly formed");
}
} }
@Test @Test (timeout=5000)
public void testJVMId() {
JVMId jvmid = new JVMId("test", 1, true, 2);
JVMId jvmid1 = JVMId.forName("jvm_test_0001_m_000002");
// test compare methot should be the same
assertEquals(0, jvmid.compareTo(jvmid1));
}
@Test (timeout=10000)
public void testGetMapCompletionEvents() throws IOException { public void testGetMapCompletionEvents() throws IOException {
TaskAttemptCompletionEvent[] empty = {}; TaskAttemptCompletionEvent[] empty = {};
TaskAttemptCompletionEvent[] taskEvents = { TaskAttemptCompletionEvent[] taskEvents = {
@ -205,7 +228,7 @@ public class TestTaskAttemptListenerImpl {
return tce; return tce;
} }
@Test @Test (timeout=1000)
public void testCommitWindow() throws IOException { public void testCommitWindow() throws IOException {
SystemClock clock = new SystemClock(); SystemClock clock = new SystemClock();

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -39,62 +37,7 @@ public class JobEndNotifier {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(JobEndNotifier.class.getName()); LogFactory.getLog(JobEndNotifier.class.getName());
private static Thread thread;
private static volatile boolean running;
private static BlockingQueue<JobEndStatusInfo> queue =
new DelayQueue<JobEndStatusInfo>();
public static void startNotifier() {
running = true;
thread = new Thread(
new Runnable() {
public void run() {
try {
while (running) {
sendNotification(queue.take());
}
}
catch (InterruptedException irex) {
if (running) {
LOG.error("Thread has ended unexpectedly", irex);
}
}
}
private void sendNotification(JobEndStatusInfo notification) {
try {
int code = httpNotification(notification.getUri());
if (code != 200) {
throw new IOException("Invalid response status code: " + code);
}
}
catch (IOException ioex) {
LOG.error("Notification failure [" + notification + "]", ioex);
if (notification.configureForRetry()) {
try {
queue.put(notification);
}
catch (InterruptedException iex) {
LOG.error("Notification queuing error [" + notification + "]",
iex);
}
}
}
catch (Exception ex) {
LOG.error("Notification failure [" + notification + "]", ex);
}
}
}
);
thread.start();
}
public static void stopNotifier() {
running = false;
thread.interrupt();
}
private static JobEndStatusInfo createNotification(JobConf conf, private static JobEndStatusInfo createNotification(JobConf conf,
JobStatus status) { JobStatus status) {
@ -118,18 +61,6 @@ public class JobEndNotifier {
return notification; return notification;
} }
public static void registerNotification(JobConf jobConf, JobStatus status) {
JobEndStatusInfo notification = createNotification(jobConf, status);
if (notification != null) {
try {
queue.put(notification);
}
catch (InterruptedException iex) {
LOG.error("Notification queuing failure [" + notification + "]", iex);
}
}
}
private static int httpNotification(String uri) throws IOException { private static int httpNotification(String uri) throws IOException {
URI url = new URI(uri, false); URI url = new URI(uri, false);
HttpClient m_client = new HttpClient(); HttpClient m_client = new HttpClient();
@ -194,10 +125,6 @@ public class JobEndNotifier {
return retryInterval; return retryInterval;
} }
public long getDelayTime() {
return delayTime;
}
public boolean configureForRetry() { public boolean configureForRetry() {
boolean retry = false; boolean retry = false;
if (getRetryAttempts() > 0) { if (getRetryAttempts() > 0) {

View File

@ -40,7 +40,6 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.List;
import java.net.URL; import java.net.URL;
@ -487,73 +486,8 @@ public class QueueManager {
new QueueAclsInfo[queueAclsInfolist.size()]); new QueueAclsInfo[queueAclsInfolist.size()]);
} }
/**
* ONLY FOR TESTING - Do not use in production code.
* This method is used for setting up of leafQueues only.
* We are not setting the hierarchy here.
*
* @param queues
*/
synchronized void setQueues(Queue[] queues) {
root.getChildren().clear();
leafQueues.clear();
allQueues.clear();
for (Queue queue : queues) {
root.addChild(queue);
}
//At this point we have root populated
//update data structures leafNodes.
leafQueues = getRoot().getLeafQueues();
allQueues.putAll(getRoot().getInnerQueues());
allQueues.putAll(leafQueues);
}
/**
* Return an array of {@link JobQueueInfo} objects for the root
* queues configured in the system.
* <p/>
* Root queues are queues that are at the top-most level in the
* hierarchy of queues in mapred-queues.xml, or they are the queues
* configured in the mapred.queue.names key in mapred-site.xml.
*
* @return array of JobQueueInfo objects for root level queues.
*/
JobQueueInfo[] getRootQueues() {
List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
return list.toArray(new JobQueueInfo[list.size()]);
}
/**
* Get the complete hierarchy of children for queue
* queueName
*
* @param queueName
* @return
*/
JobQueueInfo[] getChildQueues(String queueName) {
List<JobQueueInfo> list =
allQueues.get(queueName).getJobQueueInfo().getChildren();
if (list != null) {
return list.toArray(new JobQueueInfo[list.size()]);
} else {
return new JobQueueInfo[0];
}
}
/**
* Used only for testing purposes .
* This method is unstable as refreshQueues would leave this
* data structure in unstable state.
*
* @param queueName
* @return
*/
Queue getQueue(String queueName) {
return this.allQueues.get(queueName);
}
/** /**
* Return if ACLs are enabled for the Map/Reduce system * Return if ACLs are enabled for the Map/Reduce system
@ -573,29 +507,7 @@ public class QueueManager {
return root; return root;
} }
/**
* Returns the specific queue ACL for the given queue.
* Returns null if the given queue does not exist or the acl is not
* configured for that queue.
* If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
* ACL with all users.
*/
synchronized AccessControlList getQueueACL(String queueName,
QueueACL qACL) {
if (areAclsEnabled) {
Queue q = leafQueues.get(queueName);
if (q != null) {
return q.getAcls().get(toFullPropertyName(
queueName, qACL.getAclName()));
}
else {
LOG.warn("Queue " + queueName + " is not present.");
return null;
}
}
return new AccessControlList("*");
}
/** /**
* Dumps the configuration of hierarchy of queues * Dumps the configuration of hierarchy of queues
* @param out the writer object to which dump is written * @param out the writer object to which dump is written

View File

@ -386,73 +386,6 @@ public class TaskLog {
return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024; return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
} }
/**
* Wrap a command in a shell to capture stdout and stderr to files.
* If the tailLength is 0, the entire output will be saved.
* @param cmd The command and the arguments that should be run
* @param stdoutFilename The filename that stdout should be saved to
* @param stderrFilename The filename that stderr should be saved to
* @param tailLength The length of the tail to be saved.
* @return the modified command that should be run
*/
public static List<String> captureOutAndError(List<String> cmd,
File stdoutFilename,
File stderrFilename,
long tailLength
) throws IOException {
return captureOutAndError(null, cmd, stdoutFilename,
stderrFilename, tailLength, false);
}
/**
* Wrap a command in a shell to capture stdout and stderr to files.
* Setup commands such as setting memory limit can be passed which
* will be executed before exec.
* If the tailLength is 0, the entire output will be saved.
* @param setup The setup commands for the execed process.
* @param cmd The command and the arguments that should be run
* @param stdoutFilename The filename that stdout should be saved to
* @param stderrFilename The filename that stderr should be saved to
* @param tailLength The length of the tail to be saved.
* @return the modified command that should be run
*/
public static List<String> captureOutAndError(List<String> setup,
List<String> cmd,
File stdoutFilename,
File stderrFilename,
long tailLength
) throws IOException {
return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
tailLength, false);
}
/**
* Wrap a command in a shell to capture stdout and stderr to files.
* Setup commands such as setting memory limit can be passed which
* will be executed before exec.
* If the tailLength is 0, the entire output will be saved.
* @param setup The setup commands for the execed process.
* @param cmd The command and the arguments that should be run
* @param stdoutFilename The filename that stdout should be saved to
* @param stderrFilename The filename that stderr should be saved to
* @param tailLength The length of the tail to be saved.
* @param pidFileName The name of the pid-file. pid-file's usage is deprecated
* @return the modified command that should be run
*
* @deprecated pidFiles are no more used. Instead pid is exported to
* env variable JVM_PID.
*/
@Deprecated
public static List<String> captureOutAndError(List<String> setup,
List<String> cmd,
File stdoutFilename,
File stderrFilename,
long tailLength,
String pidFileName
) throws IOException {
return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
tailLength, false);
}
/** /**
* Wrap a command in a shell to capture stdout and stderr to files. * Wrap a command in a shell to capture stdout and stderr to files.
@ -607,25 +540,6 @@ public class TaskLog {
return command.toString(); return command.toString();
} }
/**
* Wrap a command in a shell to capture debug script's
* stdout and stderr to debugout.
* @param cmd The command and the arguments that should be run
* @param debugoutFilename The filename that stdout and stderr
* should be saved to.
* @return the modified command that should be run
* @throws IOException
*/
public static List<String> captureDebugOut(List<String> cmd,
File debugoutFilename
) throws IOException {
String debugout = FileUtil.makeShellPath(debugoutFilename);
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
result.add(buildDebugScriptCommandLine(cmd, debugout));
return result;
}
/** /**
* Method to return the location of user log directory. * Method to return the location of user log directory.

View File

@ -523,17 +523,5 @@ public abstract class TaskStatus implements Writable, Cloneable {
return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus(); return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
} }
static TaskStatus readTaskStatus(DataInput in) throws IOException {
boolean isMap = in.readBoolean();
TaskStatus taskStatus = createTaskStatus(isMap);
taskStatus.readFields(in);
return taskStatus;
}
static void writeTaskStatus(DataOutput out, TaskStatus taskStatus)
throws IOException {
out.writeBoolean(taskStatus.getIsMap());
taskStatus.write(out);
}
} }

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* test Clock class
*
*/
public class TestClock {
@Test (timeout=1000)
public void testClock(){
Clock clock= new Clock();
long templateTime=System.currentTimeMillis();
long time=clock.getTime();
assertEquals(templateTime, time,30);
}
}

View File

@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.regex.Pattern;
import static org.junit.Assert.*;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
/**
* test JobConf
*
*/
public class TestJobConf {
/**
* test getters and setters of JobConf
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testJobConf() {
JobConf conf = new JobConf();
// test default value
Pattern pattern = conf.getJarUnpackPattern();
assertEquals(Pattern.compile("(?:classes/|lib/).*").toString(),
pattern.toString());
// default value
assertFalse(conf.getKeepFailedTaskFiles());
conf.setKeepFailedTaskFiles(true);
assertTrue(conf.getKeepFailedTaskFiles());
// default value
assertNull(conf.getKeepTaskFilesPattern());
conf.setKeepTaskFilesPattern("123454");
assertEquals("123454", conf.getKeepTaskFilesPattern());
// default value
assertNotNull(conf.getWorkingDirectory());
conf.setWorkingDirectory(new Path("test"));
assertTrue(conf.getWorkingDirectory().toString().endsWith("test"));
// default value
assertEquals(1, conf.getNumTasksToExecutePerJvm());
// default value
assertNull(conf.getKeyFieldComparatorOption());
conf.setKeyFieldComparatorOptions("keySpec");
assertEquals("keySpec", conf.getKeyFieldComparatorOption());
// default value
assertFalse(conf.getUseNewReducer());
conf.setUseNewReducer(true);
assertTrue(conf.getUseNewReducer());
// default
assertTrue(conf.getMapSpeculativeExecution());
assertTrue(conf.getReduceSpeculativeExecution());
assertTrue(conf.getSpeculativeExecution());
conf.setReduceSpeculativeExecution(false);
assertTrue(conf.getSpeculativeExecution());
conf.setMapSpeculativeExecution(false);
assertFalse(conf.getSpeculativeExecution());
assertFalse(conf.getMapSpeculativeExecution());
assertFalse(conf.getReduceSpeculativeExecution());
conf.setSessionId("ses");
assertEquals("ses", conf.getSessionId());
assertEquals(3, conf.getMaxTaskFailuresPerTracker());
conf.setMaxTaskFailuresPerTracker(2);
assertEquals(2, conf.getMaxTaskFailuresPerTracker());
assertEquals(0, conf.getMaxMapTaskFailuresPercent());
conf.setMaxMapTaskFailuresPercent(50);
assertEquals(50, conf.getMaxMapTaskFailuresPercent());
assertEquals(0, conf.getMaxReduceTaskFailuresPercent());
conf.setMaxReduceTaskFailuresPercent(70);
assertEquals(70, conf.getMaxReduceTaskFailuresPercent());
// by default
assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
conf.setJobPriority(JobPriority.HIGH);
assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
assertNull(conf.getJobSubmitHostName());
conf.setJobSubmitHostName("hostname");
assertEquals("hostname", conf.getJobSubmitHostName());
// default
assertNull(conf.getJobSubmitHostAddress());
conf.setJobSubmitHostAddress("ww");
assertEquals("ww", conf.getJobSubmitHostAddress());
// default value
assertFalse(conf.getProfileEnabled());
conf.setProfileEnabled(true);
assertTrue(conf.getProfileEnabled());
// default value
assertEquals(conf.getProfileTaskRange(true).toString(), "0-2");
assertEquals(conf.getProfileTaskRange(false).toString(), "0-2");
conf.setProfileTaskRange(true, "0-3");
assertEquals(conf.getProfileTaskRange(false).toString(), "0-2");
assertEquals(conf.getProfileTaskRange(true).toString(), "0-3");
// default value
assertNull(conf.getMapDebugScript());
conf.setMapDebugScript("mDbgScript");
assertEquals("mDbgScript", conf.getMapDebugScript());
// default value
assertNull(conf.getReduceDebugScript());
conf.setReduceDebugScript("rDbgScript");
assertEquals("rDbgScript", conf.getReduceDebugScript());
// default value
assertNull(conf.getJobLocalDir());
assertEquals("default", conf.getQueueName());
conf.setQueueName("qname");
assertEquals("qname", conf.getQueueName());
assertEquals(1, conf.computeNumSlotsPerMap(100L));
assertEquals(1, conf.computeNumSlotsPerReduce(100L));
conf.setMemoryForMapTask(100 * 1000);
assertEquals(1000, conf.computeNumSlotsPerMap(100L));
conf.setMemoryForReduceTask(1000 * 1000);
assertEquals(1000, conf.computeNumSlotsPerReduce(1000L));
assertEquals(-1, conf.getMaxPhysicalMemoryForTask());
assertEquals("The variable key is no longer used.",
JobConf.deprecatedString("key"));
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* test class JobInfo
*
*
*/
public class TestJobInfo {
@Test (timeout=5000)
public void testJobInfo() throws IOException {
JobID jid = new JobID("001", 1);
Text user = new Text("User");
Path path = new Path("/tmp/test");
JobInfo info = new JobInfo(jid, user, path);
ByteArrayOutputStream out = new ByteArrayOutputStream();
info.write(new DataOutputStream(out));
JobInfo copyinfo = new JobInfo();
copyinfo.readFields(new DataInputStream(new ByteArrayInputStream(out
.toByteArray())));
assertEquals(info.getJobID().toString(), copyinfo.getJobID().toString());
assertEquals(info.getJobSubmitDir().getName(), copyinfo.getJobSubmitDir()
.getName());
assertEquals(info.getUser().toString(), copyinfo.getUser().toString());
}
}

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.mapred.TaskCompletionEvent.Status;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Test deprecated methods
*
*/
public class TestOldMethodsJobID {
/**
* test deprecated methods of TaskID
* @throws IOException
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testDepricatedMethods() throws IOException {
JobID jid = new JobID();
TaskID test = new TaskID(jid, true, 1);
assertEquals(test.getTaskType(), TaskType.MAP);
test = new TaskID(jid, false, 1);
assertEquals(test.getTaskType(), TaskType.REDUCE);
test = new TaskID("001", 1, false, 1);
assertEquals(test.getTaskType(), TaskType.REDUCE);
test = new TaskID("001", 1, true, 1);
assertEquals(test.getTaskType(), TaskType.MAP);
ByteArrayOutputStream out = new ByteArrayOutputStream();
test.write(new DataOutputStream(out));
TaskID ti = TaskID.read(new DataInputStream(new ByteArrayInputStream(out
.toByteArray())));
assertEquals(ti.toString(), test.toString());
assertEquals("task_001_0001_m_000002",
TaskID.getTaskIDsPattern("001", 1, true, 2));
assertEquals("task_003_0001_m_000004",
TaskID.getTaskIDsPattern("003", 1, TaskType.MAP, 4));
assertEquals("003_0001_m_000004",
TaskID.getTaskIDsPatternWOPrefix("003", 1, TaskType.MAP, 4).toString());
}
/**
* test JobID
* @throws IOException
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testJobID() throws IOException{
JobID jid = new JobID("001",2);
ByteArrayOutputStream out = new ByteArrayOutputStream();
jid.write(new DataOutputStream(out));
assertEquals(jid,JobID.read(new DataInputStream(new ByteArrayInputStream(out.toByteArray()))));
assertEquals("job_001_0001",JobID.getJobIDsPattern("001",1));
}
/**
* test deprecated methods of TaskCompletionEvent
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testTaskCompletionEvent() {
TaskAttemptID taid = new TaskAttemptID("001", 1, TaskType.REDUCE, 2, 3);
TaskCompletionEvent template = new TaskCompletionEvent(12, taid, 13, true,
Status.SUCCEEDED, "httptracker");
TaskCompletionEvent testEl = TaskCompletionEvent.downgrade(template);
testEl.setTaskAttemptId(taid);
testEl.setTaskTrackerHttp("httpTracker");
testEl.setTaskId("attempt_001_0001_m_000002_04");
assertEquals("attempt_001_0001_m_000002_4",testEl.getTaskId());
testEl.setTaskStatus(Status.OBSOLETE);
assertEquals(Status.OBSOLETE.toString(), testEl.getStatus().toString());
testEl.setTaskRunTime(20);
assertEquals(testEl.getTaskRunTime(), 20);
testEl.setEventId(16);
assertEquals(testEl.getEventId(), 16);
}
/**
* test depricated methods of JobProfile
* @throws IOException
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testJobProfile() throws IOException {
JobProfile profile = new JobProfile("user", "job_001_03", "jobFile", "uri",
"name");
assertEquals("job_001_0003", profile.getJobId());
assertEquals("default", profile.getQueueName());
// serialization test
ByteArrayOutputStream out = new ByteArrayOutputStream();
profile.write(new DataOutputStream(out));
JobProfile profile2 = new JobProfile();
profile2.readFields(new DataInputStream(new ByteArrayInputStream(out
.toByteArray())));
assertEquals(profile2.name, profile.name);
assertEquals(profile2.jobFile, profile.jobFile);
assertEquals(profile2.queueName, profile.queueName);
assertEquals(profile2.url, profile.url);
assertEquals(profile2.user, profile.user);
}
/**
* test TaskAttemptID
*/
@SuppressWarnings( "deprecation" )
@Test (timeout=5000)
public void testTaskAttemptID (){
TaskAttemptID task = new TaskAttemptID("001",2,true,3,4);
assertEquals("attempt_001_0002_m_000003_4", TaskAttemptID.getTaskAttemptIDsPattern("001", 2, true, 3, 4));
assertEquals("task_001_0002_m_000003", task.getTaskID().toString());
assertEquals("attempt_001_0001_r_000002_3",TaskAttemptID.getTaskAttemptIDsPattern("001", 1, TaskType.REDUCE, 2, 3));
assertEquals("001_0001_m_000001_2", TaskAttemptID.getTaskAttemptIDsPatternWOPrefix("001",1, TaskType.MAP, 1, 2).toString());
}
/**
* test Reporter.NULL
*
*/
@Test (timeout=5000)
public void testReporter(){
Reporter nullReporter=Reporter.NULL;
assertNull(nullReporter.getCounter(null));
assertNull(nullReporter.getCounter("group", "name"));
// getInputSplit method removed
try{
assertNull(nullReporter.getInputSplit());
}catch(UnsupportedOperationException e){
assertEquals( "NULL reporter has no input",e.getMessage());
}
assertEquals(0,nullReporter.getProgress(),0.01);
}
}

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
import static junit.framework.Assert.*;
import static org.mockito.Mockito.*;
/**
* TestCounters checks the sanity and recoverability of Queue
*/
public class TestQueue {
/**
* test QueueManager
* configuration from file
*
* @throws IOException
*/
@Test (timeout=5000)
public void testQueue() throws IOException {
File f = null;
try {
f = writeFile();
QueueManager manager = new QueueManager(f.getCanonicalPath(), true);
manager.setSchedulerInfo("first", "queueInfo");
manager.setSchedulerInfo("second", "queueInfoqueueInfo");
Queue root = manager.getRoot();
assertTrue(root.getChildren().size() == 2);
Iterator<Queue> iterator = root.getChildren().iterator();
Queue firstSubQueue = iterator.next();
assertTrue(firstSubQueue.getName().equals("first"));
assertEquals(
firstSubQueue.getAcls().get("mapred.queue.first.acl-submit-job")
.toString(),
"Users [user1, user2] and members of the groups [group1, group2] are allowed");
Queue secondSubQueue = iterator.next();
assertTrue(secondSubQueue.getName().equals("second"));
assertEquals(secondSubQueue.getProperties().getProperty("key"), "value");
assertEquals(secondSubQueue.getProperties().getProperty("key1"), "value1");
// test status
assertEquals(firstSubQueue.getState().getStateName(), "running");
assertEquals(secondSubQueue.getState().getStateName(), "stopped");
Set<String> template = new HashSet<String>();
template.add("first");
template.add("second");
assertEquals(manager.getLeafQueueNames(), template);
// test user access
UserGroupInformation mockUGI = mock(UserGroupInformation.class);
when(mockUGI.getShortUserName()).thenReturn("user1");
String[] groups = { "group1" };
when(mockUGI.getGroupNames()).thenReturn(groups);
assertTrue(manager.hasAccess("first", QueueACL.SUBMIT_JOB, mockUGI));
assertFalse(manager.hasAccess("second", QueueACL.SUBMIT_JOB, mockUGI));
assertFalse(manager.hasAccess("first", QueueACL.ADMINISTER_JOBS, mockUGI));
when(mockUGI.getShortUserName()).thenReturn("user3");
assertTrue(manager.hasAccess("first", QueueACL.ADMINISTER_JOBS, mockUGI));
QueueAclsInfo[] qai = manager.getQueueAcls(mockUGI);
assertEquals(qai.length, 1);
// test refresh queue
manager.refreshQueues(getConfiguration(), null);
iterator = root.getChildren().iterator();
Queue firstSubQueue1 = iterator.next();
Queue secondSubQueue1 = iterator.next();
// tets equal method
assertTrue(firstSubQueue.equals(firstSubQueue1));
assertEquals(firstSubQueue1.getState().getStateName(), "running");
assertEquals(secondSubQueue1.getState().getStateName(), "stopped");
assertEquals(firstSubQueue1.getSchedulingInfo(), "queueInfo");
assertEquals(secondSubQueue1.getSchedulingInfo(), "queueInfoqueueInfo");
// test JobQueueInfo
assertEquals(firstSubQueue.getJobQueueInfo().getQueueName(), "first");
assertEquals(firstSubQueue.getJobQueueInfo().getQueueState(), "running");
assertEquals(firstSubQueue.getJobQueueInfo().getSchedulingInfo(),
"queueInfo");
assertEquals(secondSubQueue.getJobQueueInfo().getChildren().size(), 0);
// test
assertEquals(manager.getSchedulerInfo("first"), "queueInfo");
assertEquals(manager.getJobQueueInfos()[0].getQueueName(), secondSubQueue
.getJobQueueInfo().getQueueName());
assertEquals(manager.getJobQueueInfos()[1].getQueueName(), firstSubQueue
.getJobQueueInfo().getQueueName());
// test getJobQueueInfoMapping
assertEquals(
manager.getJobQueueInfoMapping().get("first").getQueueName(), "first");
// test dumpConfiguration
Writer writer = new StringWriter();
Configuration conf = getConfiguration();
conf.unset(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY);
QueueManager.dumpConfiguration(writer, f.getAbsolutePath(), conf);
String result = writer.toString();
assertTrue(result
.indexOf("\"name\":\"first\",\"state\":\"running\",\"acl_submit_job\":\"user1,user2 group1,group2\",\"acl_administer_jobs\":\"user3,user4 group3,group4\",\"properties\":[],\"children\":[]") > 0);
writer = new StringWriter();
QueueManager.dumpConfiguration(writer, conf);
result = writer.toString();
assertEquals(
"{\"queues\":[{\"name\":\"default\",\"state\":\"running\",\"acl_submit_job\":\"*\",\"acl_administer_jobs\":\"*\",\"properties\":[],\"children\":[]},{\"name\":\"q1\",\"state\":\"running\",\"acl_submit_job\":\" \",\"acl_administer_jobs\":\" \",\"properties\":[],\"children\":[{\"name\":\"q1:q2\",\"state\":\"running\",\"acl_submit_job\":\" \",\"acl_administer_jobs\":\" \",\"properties\":[{\"key\":\"capacity\",\"value\":\"20\"},{\"key\":\"user-limit\",\"value\":\"30\"}],\"children\":[]}]}]}",
result);
// test constructor QueueAclsInfo
QueueAclsInfo qi = new QueueAclsInfo();
assertNull(qi.getQueueName());
} finally {
if (f != null) {
f.delete();
}
}
}
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set(DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY,
"first,second");
conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX
+ "first.acl-submit-job", "user1,user2 group1,group2");
conf.set(MRConfig.MR_ACLS_ENABLED, "true");
conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX + "first.state",
"running");
conf.set(QueueManager.QUEUE_CONF_PROPERTY_NAME_PREFIX + "second.state",
"stopped");
return conf;
}
@Test (timeout=5000)
public void testDefaultConfig() {
QueueManager manager = new QueueManager(true);
assertEquals(manager.getRoot().getChildren().size(), 2);
}
/**
* test for Qmanager with empty configuration
*
* @throws IOException
*/
@Test (timeout=5000)
public void test2Queue() throws IOException {
Configuration conf = getConfiguration();
QueueManager manager = new QueueManager(conf);
manager.setSchedulerInfo("first", "queueInfo");
manager.setSchedulerInfo("second", "queueInfoqueueInfo");
Queue root = manager.getRoot();
// test children queues
assertTrue(root.getChildren().size() == 2);
Iterator<Queue> iterator = root.getChildren().iterator();
Queue firstSubQueue = iterator.next();
assertTrue(firstSubQueue.getName().equals("first"));
assertEquals(
firstSubQueue.getAcls().get("mapred.queue.first.acl-submit-job")
.toString(),
"Users [user1, user2] and members of the groups [group1, group2] are allowed");
Queue secondSubQueue = iterator.next();
assertTrue(secondSubQueue.getName().equals("second"));
assertEquals(firstSubQueue.getState().getStateName(), "running");
assertEquals(secondSubQueue.getState().getStateName(), "stopped");
assertTrue(manager.isRunning("first"));
assertFalse(manager.isRunning("second"));
assertEquals(firstSubQueue.getSchedulingInfo(), "queueInfo");
assertEquals(secondSubQueue.getSchedulingInfo(), "queueInfoqueueInfo");
// test leaf queue
Set<String> template = new HashSet<String>();
template.add("first");
template.add("second");
assertEquals(manager.getLeafQueueNames(), template);
}
/**
* write cofiguration
* @return
* @throws IOException
*/
private File writeFile() throws IOException {
File f = null;
f = File.createTempFile("tst", "xml");
BufferedWriter out = new BufferedWriter(new FileWriter(f));
String properties = "<properties><property key=\"key\" value=\"value\"/><property key=\"key1\" value=\"value1\"/></properties>";
out.write("<queues>");
out.newLine();
out.write("<queue><name>first</name><acl-submit-job>user1,user2 group1,group2</acl-submit-job><acl-administer-jobs>user3,user4 group3,group4</acl-administer-jobs><state>running</state></queue>");
out.newLine();
out.write("<queue><name>second</name><acl-submit-job>u1,u2 g1,g2</acl-submit-job>"
+ properties + "<state>stopped</state></queue>");
out.newLine();
out.write("</queues>");
out.flush();
out.close();
return f;
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* test SkipBadRecords
*
*
*/
public class TestSkipBadRecords {
@Test (timeout=5000)
public void testSkipBadRecords() {
// test default values
Configuration conf = new Configuration();
assertEquals(2, SkipBadRecords.getAttemptsToStartSkipping(conf));
assertTrue(SkipBadRecords.getAutoIncrMapperProcCount(conf));
assertTrue(SkipBadRecords.getAutoIncrReducerProcCount(conf));
assertEquals(0, SkipBadRecords.getMapperMaxSkipRecords(conf));
assertEquals(0, SkipBadRecords.getReducerMaxSkipGroups(conf), 0);
assertNull(SkipBadRecords.getSkipOutputPath(conf));
// test setters
SkipBadRecords.setAttemptsToStartSkipping(conf, 5);
SkipBadRecords.setAutoIncrMapperProcCount(conf, false);
SkipBadRecords.setAutoIncrReducerProcCount(conf, false);
SkipBadRecords.setMapperMaxSkipRecords(conf, 6L);
SkipBadRecords.setReducerMaxSkipGroups(conf, 7L);
JobConf jc= new JobConf();
SkipBadRecords.setSkipOutputPath(jc, new Path("test"));
// test getters
assertEquals(5, SkipBadRecords.getAttemptsToStartSkipping(conf));
assertFalse(SkipBadRecords.getAutoIncrMapperProcCount(conf));
assertFalse(SkipBadRecords.getAutoIncrReducerProcCount(conf));
assertEquals(6L, SkipBadRecords.getMapperMaxSkipRecords(conf));
assertEquals(7L, SkipBadRecords.getReducerMaxSkipGroups(conf), 0);
assertEquals("test",SkipBadRecords.getSkipOutputPath(jc).toString());
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.Test;
import static junit.framework.Assert.*;
import static org.mockito.Mockito.*;
/**
* TestCounters checks the sanity and recoverability of Queue
*/
public class TestTaskLog {
/**
* test TaskAttemptID
*
* @throws IOException
*/
@Test (timeout=50000)
public void testTaskLog() throws IOException {
// test TaskLog
System.setProperty(MRJobConfig.TASK_LOG_DIR, "testString");
assertEquals(TaskLog.getMRv2LogDir(), "testString");
TaskAttemptID taid = mock(TaskAttemptID.class);
JobID jid = new JobID("job", 1);
when(taid.getJobID()).thenReturn(jid);
when(taid.toString()).thenReturn("JobId");
File f = TaskLog.getTaskLogFile(taid, true, LogName.STDOUT);
assertTrue(f.getAbsolutePath().endsWith("testString/stdout"));
// test getRealTaskLogFileLocation
File indexFile = TaskLog.getIndexFile(taid, true);
if (!indexFile.getParentFile().exists()) {
indexFile.getParentFile().mkdirs();
}
indexFile.delete();
indexFile.createNewFile();
TaskLog.syncLogs("location", taid, true);
assertTrue(indexFile.getAbsolutePath().endsWith(
"userlogs/job_job_0001/JobId.cleanup/log.index"));
f = TaskLog.getRealTaskLogFileLocation(taid, true, LogName.DEBUGOUT);
if (f != null) {
assertTrue(f.getAbsolutePath().endsWith("location/debugout"));
FileUtils.copyFile(indexFile, f);
}
// test obtainLogDirOwner
assertTrue(TaskLog.obtainLogDirOwner(taid).length() > 0);
// test TaskLog.Reader
assertTrue(readTaskLog(TaskLog.LogName.DEBUGOUT, taid, true).length() > 0);
}
public String readTaskLog(TaskLog.LogName filter,
org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
res = taskLogReader.read(b);
if (res > 0) {
result.append(new String(b));
} else {
break;
}
}
taskLogReader.close();
// trim the string and return it
String str = result.toString();
str = str.trim();
return str;
}
/**
* test without TASK_LOG_DIR
*
* @throws IOException
*/
@Test (timeout=50000)
public void testTaskLogWithoutTaskLogDir() throws IOException {
// TaskLog tasklog= new TaskLog();
System.clearProperty(MRJobConfig.TASK_LOG_DIR);
// test TaskLog
assertEquals(TaskLog.getMRv2LogDir(), null);
TaskAttemptID taid = mock(TaskAttemptID.class);
JobID jid = new JobID("job", 1);
when(taid.getJobID()).thenReturn(jid);
when(taid.toString()).thenReturn("JobId");
File f = TaskLog.getTaskLogFile(taid, true, LogName.STDOUT);
assertTrue(f.getAbsolutePath().endsWith("stdout"));
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.StringWriter;
import java.io.Writer;
import org.apache.log4j.Category;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.Priority;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestTaskLogAppender {
/**
* test TaskLogAppender
*/
@SuppressWarnings("deprecation")
@Test (timeout=5000)
public void testTaskLogAppender(){
TaskLogAppender appender= new TaskLogAppender();
System.setProperty(TaskLogAppender.TASKID_PROPERTY,"attempt_01_02_m03_04_001");
System.setProperty(TaskLogAppender.LOGSIZE_PROPERTY, "1003");
appender.activateOptions();
assertEquals(appender.getTaskId(), "attempt_01_02_m03_04_001");
assertEquals(appender.getTotalLogFileSize(),1000);
assertEquals(appender.getIsCleanup(),false);
// test writer
Writer writer= new StringWriter();
appender.setWriter(writer);
Layout layout = new PatternLayout("%-5p [%t]: %m%n");
appender.setLayout(layout);
Category logger= Logger.getLogger(getClass().getName());
LoggingEvent event = new LoggingEvent("fqnOfCategoryClass", logger, Priority.INFO, "message", new Throwable());
appender.append(event);
appender.flush() ;
appender.close();
assertTrue(writer.toString().length()>0);
// test cleanup should not changed
appender= new TaskLogAppender();
appender.setIsCleanup(true);
appender.activateOptions();
assertEquals(appender.getIsCleanup(),true);
}
}

View File

@ -0,0 +1,69 @@
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- This is the template for queue configuration. The format supports nesting of
queues within queues - a feature called hierarchical queues. All queues are
defined within the 'queues' tag which is the top level element for this
XML document.
The 'aclsEnabled' attribute should be set to true, if ACLs should be checked
on queue operations such as submitting jobs, killing jobs etc. -->
<queues aclsEnabled="false">
<!-- Configuration for a queue is specified by defining a 'queue' element. -->
<queue>
<!-- Name of a queue. Queue name cannot contain a ':' -->
<name>default</name>
<!-- properties for a queue, typically used by schedulers,
can be defined here -->
<properties>
</properties>
<!-- State of the queue. If running, the queue will accept new jobs.
If stopped, the queue will not accept new jobs. -->
<state>running</state>
<!-- Specifies the ACLs to check for submitting jobs to this queue.
If set to '*', it allows all users to submit jobs to the queue.
For specifying a list of users and groups the format to use is
user1,user2 group1,group2 -->
<acl-submit-job>*</acl-submit-job>
<!-- Specifies the ACLs to check for modifying jobs in this queue.
Modifications include killing jobs, tasks of jobs or changing
priorities.
If set to '*', it allows all users to submit jobs to the queue.
For specifying a list of users and groups the format to use is
user1,user2 group1,group2 -->
<acl-administer-jobs>*</acl-administer-jobs>
</queue>
<!-- Here is a sample of a hierarchical queue configuration
where q2 is a child of q1. In this example, q2 is a leaf level
queue as it has no queues configured within it. Currently, ACLs
and state are only supported for the leaf level queues.
Note also the usage of properties for the queue q2. -->
<queue>
<name>q1</name>
<queue>
<name>q2</name>
<properties>
<property key="capacity" value="20"/>
<property key="user-limit" value="30"/>
</properties>
</queue>
</queue>
</queues>

View File

@ -26,12 +26,12 @@ import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
public class TestIFile { public class TestIFile {
@Test @Test
/** /**
* Create an IFile.Writer using GzipCodec since this codec does not * Create an IFile.Writer using GzipCodec since this code does not
* have a compressor when run via the tests (ie no native libraries). * have a compressor when run via the tests (ie no native libraries).
*/ */
public void testIFileWriterWithCodec() throws Exception { public void testIFileWriterWithCodec() throws Exception {
@ -63,5 +63,11 @@ public class TestIFile {
IFile.Reader<Text, Text> reader = IFile.Reader<Text, Text> reader =
new IFile.Reader<Text, Text>(conf, rfs, path, codec, null); new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
reader.close(); reader.close();
// test check sum
byte[] ab= new byte[100];
int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
assertEquals( readed,reader.checksumIn.getChecksum().length);
} }
} }

View File

@ -21,13 +21,20 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
/**
*
* test MultiFileSplit class
*/
public class TestMultiFileSplit extends TestCase{ public class TestMultiFileSplit extends TestCase{
public void testReadWrite() throws Exception { public void testReadWrite() throws Exception {
@ -58,4 +65,26 @@ public class TestMultiFileSplit extends TestCase{
assertTrue(Arrays.equals(split.getLengths(), readSplit.getLengths())); assertTrue(Arrays.equals(split.getLengths(), readSplit.getLengths()));
System.out.println(split.toString()); System.out.println(split.toString());
} }
/**
* test method getLocations
* @throws IOException
*/
public void testgetLocations() throws IOException{
JobConf job= new JobConf();
File tmpFile = File.createTempFile("test","txt");
tmpFile.createNewFile();
OutputStream out=new FileOutputStream(tmpFile);
out.write("tempfile".getBytes());
out.flush();
out.close();
Path[] path= {new Path(tmpFile.getAbsolutePath())};
long[] lengths = {100};
MultiFileSplit split = new MultiFileSplit(job,path,lengths);
String [] locations= split.getLocations();
assertTrue(locations.length==1);
assertEquals(locations[0], "localhost");
}
} }

View File

@ -18,24 +18,37 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapred.JobClient.NetworkedJob;
import org.apache.hadoop.mapred.JobClient.TaskStatusFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.yarn.YarnException;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
public class TestNetworkedJob { public class TestNetworkedJob {
private static String TEST_ROOT_DIR = new File(System.getProperty( private static String TEST_ROOT_DIR = new File(System.getProperty(
@ -44,8 +57,7 @@ public class TestNetworkedJob {
private static Path inFile = new Path(testDir, "in"); private static Path inFile = new Path(testDir, "in");
private static Path outDir = new Path(testDir, "out"); private static Path outDir = new Path(testDir, "out");
@SuppressWarnings("deprecation") @Test (timeout=5000)
@Test
public void testGetNullCounters() throws Exception { public void testGetNullCounters() throws Exception {
//mock creation //mock creation
Job mockJob = mock(Job.class); Job mockJob = mock(Job.class);
@ -57,7 +69,7 @@ public class TestNetworkedJob {
verify(mockJob).getCounters(); verify(mockJob).getCounters();
} }
@Test @Test (timeout=500000)
public void testGetJobStatus() throws IOException, InterruptedException, public void testGetJobStatus() throws IOException, InterruptedException,
ClassNotFoundException { ClassNotFoundException {
MiniMRClientCluster mr = null; MiniMRClientCluster mr = null;
@ -105,4 +117,278 @@ public class TestNetworkedJob {
} }
} }
} }
/**
* test JobConf
* @throws Exception
*/
@SuppressWarnings( "deprecation" )
@Test (timeout=500000)
public void testNetworkedJob() throws Exception {
// mock creation
MiniMRClientCluster mr = null;
FileSystem fileSys = null;
try {
Configuration conf = new Configuration();
mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
JobConf job = new JobConf(mr.getConfig());
fileSys = FileSystem.get(job);
fileSys.delete(testDir, true);
FSDataOutputStream out = fileSys.create(inFile, true);
out.writeBytes("This is a test file");
out.close();
FileInputFormat.setInputPaths(job, inFile);
FileOutputFormat.setOutputPath(job, outDir);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setNumReduceTasks(0);
JobClient client = new JobClient(mr.getConfig());
RunningJob rj = client.submitJob(job);
JobID jobId = rj.getID();
NetworkedJob runningJob = (NetworkedJob) client.getJob(jobId);
runningJob.setJobPriority(JobPriority.HIGH.name());
// test getters
assertTrue(runningJob.getConfiguration().toString()
.endsWith("0001/job.xml"));
assertEquals(runningJob.getID(), jobId);
assertEquals(runningJob.getJobID(), jobId.toString());
assertEquals(runningJob.getJobName(), "N/A");
assertTrue(runningJob.getJobFile().endsWith(
".staging/" + runningJob.getJobID() + "/job.xml"));
assertTrue(runningJob.getTrackingURL().length() > 0);
assertTrue(runningJob.mapProgress() == 0.0f);
assertTrue(runningJob.reduceProgress() == 0.0f);
assertTrue(runningJob.cleanupProgress() == 0.0f);
assertTrue(runningJob.setupProgress() == 0.0f);
TaskCompletionEvent[] tce = runningJob.getTaskCompletionEvents(0);
assertEquals(tce.length, 0);
assertEquals(runningJob.getHistoryUrl(),"");
assertFalse(runningJob.isRetired());
assertEquals( runningJob.getFailureInfo(),"");
assertEquals(runningJob.getJobStatus().getJobName(), "N/A");
assertEquals(client.getMapTaskReports(jobId).length, 0);
try {
client.getSetupTaskReports(jobId);
} catch (YarnException e) {
assertEquals(e.getMessage(), "Unrecognized task type: JOB_SETUP");
}
try {
client.getCleanupTaskReports(jobId);
} catch (YarnException e) {
assertEquals(e.getMessage(), "Unrecognized task type: JOB_CLEANUP");
}
assertEquals(client.getReduceTaskReports(jobId).length, 0);
// test ClusterStatus
ClusterStatus status = client.getClusterStatus(true);
assertEquals(status.getActiveTrackerNames().size(), 2);
// it method does not implemented and always return empty array or null;
assertEquals(status.getBlacklistedTrackers(), 0);
assertEquals(status.getBlacklistedTrackerNames().size(), 0);
assertEquals(status.getBlackListedTrackersInfo().size(), 0);
assertEquals(status.getJobTrackerStatus(), JobTrackerStatus.RUNNING);
assertEquals(status.getMapTasks(), 1);
assertEquals(status.getMaxMapTasks(), 20);
assertEquals(status.getMaxReduceTasks(), 4);
assertEquals(status.getNumExcludedNodes(), 0);
assertEquals(status.getReduceTasks(), 1);
assertEquals(status.getTaskTrackers(), 2);
assertEquals(status.getTTExpiryInterval(), 0);
assertEquals(status.getJobTrackerStatus(), JobTrackerStatus.RUNNING);
// test read and write
ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
status.write(new DataOutputStream(dataOut));
ClusterStatus status2 = new ClusterStatus();
status2.readFields(new DataInputStream(new ByteArrayInputStream(dataOut
.toByteArray())));
assertEquals(status.getActiveTrackerNames(),
status2.getActiveTrackerNames());
assertEquals(status.getBlackListedTrackersInfo(),
status2.getBlackListedTrackersInfo());
assertEquals(status.getMapTasks(), status2.getMapTasks());
try {
} catch (RuntimeException e) {
assertTrue(e.getMessage().endsWith("not found on CLASSPATH"));
}
// test taskStatusfilter
JobClient.setTaskOutputFilter(job, TaskStatusFilter.ALL);
assertEquals(JobClient.getTaskOutputFilter(job), TaskStatusFilter.ALL);
// runningJob.setJobPriority(JobPriority.HIGH.name());
// test default map
assertEquals(client.getDefaultMaps(), 20);
assertEquals(client.getDefaultReduces(), 4);
assertEquals(client.getSystemDir().getName(), "jobSubmitDir");
// test queue information
JobQueueInfo[] rootQueueInfo = client.getRootQueues();
assertEquals(rootQueueInfo.length, 1);
assertEquals(rootQueueInfo[0].getQueueName(), "default");
JobQueueInfo[] qinfo = client.getQueues();
assertEquals(qinfo.length, 1);
assertEquals(qinfo[0].getQueueName(), "default");
assertEquals(client.getChildQueues("default").length, 0);
assertEquals(client.getJobsFromQueue("default").length, 1);
assertTrue(client.getJobsFromQueue("default")[0].getJobFile().endsWith(
"/job.xml"));
JobQueueInfo qi = client.getQueueInfo("default");
assertEquals(qi.getQueueName(), "default");
assertEquals(qi.getQueueState(), "running");
QueueAclsInfo[] aai = client.getQueueAclsForCurrentUser();
assertEquals(aai.length, 2);
assertEquals(aai[0].getQueueName(), "root");
assertEquals(aai[1].getQueueName(), "default");
// test token
Token<DelegationTokenIdentifier> token = client
.getDelegationToken(new Text(UserGroupInformation.getCurrentUser()
.getShortUserName()));
assertEquals(token.getKind().toString(), "RM_DELEGATION_TOKEN");
// test JobClient
// The following asserts read JobStatus twice and ensure the returned
// JobStatus objects correspond to the same Job.
assertEquals("Expected matching JobIDs", jobId, client.getJob(jobId)
.getJobStatus().getJobID());
assertEquals("Expected matching startTimes", rj.getJobStatus()
.getStartTime(), client.getJob(jobId).getJobStatus().getStartTime());
} finally {
if (fileSys != null) {
fileSys.delete(testDir, true);
}
if (mr != null) {
mr.stop();
}
}
}
/**
* test BlackListInfo class
*
* @throws IOException
*/
@Test (timeout=5000)
public void testBlackListInfo() throws IOException {
BlackListInfo info = new BlackListInfo();
info.setBlackListReport("blackListInfo");
info.setReasonForBlackListing("reasonForBlackListing");
info.setTrackerName("trackerName");
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(byteOut);
info.write(out);
BlackListInfo info2 = new BlackListInfo();
info2.readFields(new DataInputStream(new ByteArrayInputStream(byteOut
.toByteArray())));
assertEquals(info, info);
assertEquals(info.toString(), info.toString());
assertEquals(info.getTrackerName(), "trackerName");
assertEquals(info.getReasonForBlackListing(), "reasonForBlackListing");
assertEquals(info.getBlackListReport(), "blackListInfo");
}
/**
* test run from command line JobQueueClient
* @throws Exception
*/
@Test (timeout=500000)
public void testJobQueueClient() throws Exception {
MiniMRClientCluster mr = null;
FileSystem fileSys = null;
PrintStream oldOut = System.out;
try {
Configuration conf = new Configuration();
mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
JobConf job = new JobConf(mr.getConfig());
fileSys = FileSystem.get(job);
fileSys.delete(testDir, true);
FSDataOutputStream out = fileSys.create(inFile, true);
out.writeBytes("This is a test file");
out.close();
FileInputFormat.setInputPaths(job, inFile);
FileOutputFormat.setOutputPath(job, outDir);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
job.setNumReduceTasks(0);
JobClient client = new JobClient(mr.getConfig());
client.submitJob(job);
JobQueueClient jobClient = new JobQueueClient(job);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
System.setOut(new PrintStream(bytes));
String[] arg = { "-list" };
jobClient.run(arg);
assertTrue(bytes.toString().contains("Queue Name : default"));
assertTrue(bytes.toString().contains("Queue State : running"));
bytes = new ByteArrayOutputStream();
System.setOut(new PrintStream(bytes));
String[] arg1 = { "-showacls" };
jobClient.run(arg1);
assertTrue(bytes.toString().contains("Queue acls for user :"));
assertTrue(bytes.toString().contains(
"root ADMINISTER_QUEUE,SUBMIT_APPLICATIONS"));
assertTrue(bytes.toString().contains(
"default ADMINISTER_QUEUE,SUBMIT_APPLICATIONS"));
// test for info and default queue
bytes = new ByteArrayOutputStream();
System.setOut(new PrintStream(bytes));
String[] arg2 = { "-info", "default" };
jobClient.run(arg2);
assertTrue(bytes.toString().contains("Queue Name : default"));
assertTrue(bytes.toString().contains("Queue State : running"));
assertTrue(bytes.toString().contains("Scheduling Info"));
// test for info , default queue and jobs
bytes = new ByteArrayOutputStream();
System.setOut(new PrintStream(bytes));
String[] arg3 = { "-info", "default", "-showJobs" };
jobClient.run(arg3);
assertTrue(bytes.toString().contains("Queue Name : default"));
assertTrue(bytes.toString().contains("Queue State : running"));
assertTrue(bytes.toString().contains("Scheduling Info"));
assertTrue(bytes.toString().contains("job_1"));
String[] arg4 = {};
jobClient.run(arg4);
} finally {
System.setOut(oldOut);
if (fileSys != null) {
fileSys.delete(testDir, true);
}
if (mr != null) {
mr.stop();
}
}
}
} }

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.StringWriter;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import static org.junit.Assert.*;
import org.junit.Test;
public class TestQueueConfigurationParser {
/**
* test xml generation
* @throws ParserConfigurationException
* @throws Exception
*/
@Test (timeout=5000)
public void testQueueConfigurationParser()
throws ParserConfigurationException, Exception {
JobQueueInfo info = new JobQueueInfo("root", "rootInfo");
JobQueueInfo infoChild1 = new JobQueueInfo("child1", "child1Info");
JobQueueInfo infoChild2 = new JobQueueInfo("child2", "child1Info");
info.addChild(infoChild1);
info.addChild(infoChild2);
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
.newInstance();
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document document = builder.newDocument();
// test QueueConfigurationParser.getQueueElement
Element e = QueueConfigurationParser.getQueueElement(document, info);
// transform result to string for check
DOMSource domSource = new DOMSource(e);
StringWriter writer = new StringWriter();
StreamResult result = new StreamResult(writer);
TransformerFactory tf = TransformerFactory.newInstance();
Transformer transformer = tf.newTransformer();
transformer.transform(domSource, result);
String str= writer.toString();
assertTrue(str
.endsWith("<queue><name>root</name><properties/><state>running</state><queue><name>child1</name><properties/><state>running</state></queue><queue><name>child2</name><properties/><state>running</state></queue></queue>"));
}
}

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.util.Map;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow; import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
@ -24,6 +26,7 @@ import org.apache.hadoop.mapred.StatisticsCollector.Stat;
public class TestStatisticsCollector extends TestCase{ public class TestStatisticsCollector extends TestCase{
@SuppressWarnings("rawtypes")
public void testMovingWindow() throws Exception { public void testMovingWindow() throws Exception {
StatisticsCollector collector = new StatisticsCollector(1); StatisticsCollector collector = new StatisticsCollector(1);
TimeWindow window = new TimeWindow("test", 6, 2); TimeWindow window = new TimeWindow("test", 6, 2);
@ -78,6 +81,28 @@ public class TestStatisticsCollector extends TestCase{
collector.update(); collector.update();
assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue()); assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
assertEquals(95, stat.getValues().get(sincStart).getValue()); assertEquals(95, stat.getValues().get(sincStart).getValue());
// test Stat class
Map updaters= collector.getUpdaters();
assertEquals(updaters.size(),2);
Map<String, Stat> ststistics=collector.getStatistics();
assertNotNull(ststistics.get("m1"));
Stat newStat= collector.createStat("m2");
assertEquals(newStat.name, "m2");
Stat st=collector.removeStat("m1");
assertEquals(st.name, "m1");
assertEquals((10+10+10+12+13+14), stat.getValues().get(window).getValue());
assertEquals(95, stat.getValues().get(sincStart).getValue());
st=collector.removeStat("m1");
// try to remove stat again
assertNull(st);
collector.start();
// waiting 2,5 sec
Thread.sleep(2500);
assertEquals(69, stat.getValues().get(window).getValue());
assertEquals(95, stat.getValues().get(sincStart).getValue());
} }
} }

View File

@ -61,11 +61,12 @@ public class TestTextInputFormat {
throw new RuntimeException("init failure", e); throw new RuntimeException("init failure", e);
} }
} }
@SuppressWarnings("deprecation")
private static Path workDir = private static Path workDir =
new Path(new Path(System.getProperty("test.build.data", "/tmp")), new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestTextInputFormat").makeQualified(localFs); "TestTextInputFormat").makeQualified(localFs);
@Test @Test (timeout=500000)
public void testFormat() throws Exception { public void testFormat() throws Exception {
JobConf job = new JobConf(defaultConf); JobConf job = new JobConf(defaultConf);
Path file = new Path(workDir, "test.txt"); Path file = new Path(workDir, "test.txt");
@ -145,7 +146,7 @@ public class TestTextInputFormat {
} }
} }
@Test @Test (timeout=900000)
public void testSplitableCodecs() throws IOException { public void testSplitableCodecs() throws IOException {
JobConf conf = new JobConf(defaultConf); JobConf conf = new JobConf(defaultConf);
int seed = new Random().nextInt(); int seed = new Random().nextInt();
@ -250,7 +251,7 @@ public class TestTextInputFormat {
bufsz); bufsz);
} }
@Test @Test (timeout=5000)
public void testUTF8() throws Exception { public void testUTF8() throws Exception {
LineReader in = makeStream("abcd\u20acbdcd\u20ac"); LineReader in = makeStream("abcd\u20acbdcd\u20ac");
Text line = new Text(); Text line = new Text();
@ -269,7 +270,7 @@ public class TestTextInputFormat {
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test (timeout=5000)
public void testNewLines() throws Exception { public void testNewLines() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee"; final String STR = "a\nbb\n\nccc\rdddd\r\r\r\n\r\neeeee";
final int STRLENBYTES = STR.getBytes().length; final int STRLENBYTES = STR.getBytes().length;
@ -309,7 +310,7 @@ public class TestTextInputFormat {
* *
* @throws Exception * @throws Exception
*/ */
@Test @Test (timeout=5000)
public void testMaxLineLength() throws Exception { public void testMaxLineLength() throws Exception {
final String STR = "a\nbb\n\nccc\rdddd\r\neeeee"; final String STR = "a\nbb\n\nccc\rdddd\r\neeeee";
final int STRLENBYTES = STR.getBytes().length; final int STRLENBYTES = STR.getBytes().length;
@ -334,7 +335,7 @@ public class TestTextInputFormat {
} }
} }
@Test @Test (timeout=5000)
public void testMRMaxLine() throws Exception { public void testMRMaxLine() throws Exception {
final int MAXPOS = 1024 * 1024; final int MAXPOS = 1024 * 1024;
final int MAXLINE = 10 * 1024; final int MAXLINE = 10 * 1024;
@ -354,6 +355,9 @@ public class TestTextInputFormat {
position += b.length; position += b.length;
return b.length; return b.length;
} }
public void reset() {
position=0;
}
}; };
final LongWritable key = new LongWritable(); final LongWritable key = new LongWritable();
final Text val = new Text(); final Text val = new Text();
@ -362,8 +366,14 @@ public class TestTextInputFormat {
conf.setInt(org.apache.hadoop.mapreduce.lib.input. conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, MAXLINE); LineRecordReader.MAX_LINE_LENGTH, MAXLINE);
conf.setInt("io.file.buffer.size", BUF); // used by LRR conf.setInt("io.file.buffer.size", BUF); // used by LRR
final LineRecordReader lrr = new LineRecordReader(infNull, 0, MAXPOS, conf); // test another constructor
LineRecordReader lrr = new LineRecordReader(infNull, 0, MAXPOS, conf);
assertFalse("Read a line from null", lrr.next(key, val)); assertFalse("Read a line from null", lrr.next(key, val));
infNull.reset();
lrr = new LineRecordReader(infNull, 0L, MAXLINE, MAXPOS);
assertFalse("Read a line from null", lrr.next(key, val));
} }
private static void writeFile(FileSystem fs, Path name, private static void writeFile(FileSystem fs, Path name,
@ -400,7 +410,7 @@ public class TestTextInputFormat {
/** /**
* Test using the gzip codec for reading * Test using the gzip codec for reading
*/ */
@Test @Test (timeout=5000)
public void testGzip() throws IOException { public void testGzip() throws IOException {
JobConf job = new JobConf(defaultConf); JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec(); CompressionCodec gzip = new GzipCodec();
@ -434,7 +444,7 @@ public class TestTextInputFormat {
/** /**
* Test using the gzip codec and an empty input file * Test using the gzip codec and an empty input file
*/ */
@Test @Test (timeout=5000)
public void testGzipEmpty() throws IOException { public void testGzipEmpty() throws IOException {
JobConf job = new JobConf(defaultConf); JobConf job = new JobConf(defaultConf);
CompressionCodec gzip = new GzipCodec(); CompressionCodec gzip = new GzipCodec();

View File

@ -44,7 +44,6 @@ public class TestTextOutputFormat extends TestCase {
"data"), "data"),
FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt); FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
@SuppressWarnings("unchecked")
public void testFormat() throws Exception { public void testFormat() throws Exception {
JobConf job = new JobConf(); JobConf job = new JobConf();
job.set(JobContext.TASK_ATTEMPT_ID, attempt); job.set(JobContext.TASK_ATTEMPT_ID, attempt);
@ -59,8 +58,8 @@ public class TestTextOutputFormat extends TestCase {
// A reporter that does nothing // A reporter that does nothing
Reporter reporter = Reporter.NULL; Reporter reporter = Reporter.NULL;
TextOutputFormat theOutputFormat = new TextOutputFormat(); TextOutputFormat<Object,Object> theOutputFormat = new TextOutputFormat<Object,Object>();
RecordWriter theRecordWriter = RecordWriter<Object,Object> theRecordWriter =
theOutputFormat.getRecordWriter(localFs, job, file, reporter); theOutputFormat.getRecordWriter(localFs, job, file, reporter);
Text key1 = new Text("key1"); Text key1 = new Text("key1");
@ -95,7 +94,6 @@ public class TestTextOutputFormat extends TestCase {
} }
@SuppressWarnings("unchecked")
public void testFormatWithCustomSeparator() throws Exception { public void testFormatWithCustomSeparator() throws Exception {
JobConf job = new JobConf(); JobConf job = new JobConf();
String separator = "\u0001"; String separator = "\u0001";
@ -112,8 +110,8 @@ public class TestTextOutputFormat extends TestCase {
// A reporter that does nothing // A reporter that does nothing
Reporter reporter = Reporter.NULL; Reporter reporter = Reporter.NULL;
TextOutputFormat theOutputFormat = new TextOutputFormat(); TextOutputFormat<Object,Object> theOutputFormat = new TextOutputFormat<Object,Object>();
RecordWriter theRecordWriter = RecordWriter<Object,Object> theRecordWriter =
theOutputFormat.getRecordWriter(localFs, job, file, reporter); theOutputFormat.getRecordWriter(localFs, job, file, reporter);
Text key1 = new Text("key1"); Text key1 = new Text("key1");
@ -147,7 +145,61 @@ public class TestTextOutputFormat extends TestCase {
assertEquals(output, expectedOutput.toString()); assertEquals(output, expectedOutput.toString());
} }
/**
* test compressed file
* @throws IOException
*/
public void testCompress() throws IOException{
JobConf job = new JobConf();
String separator = "\u0001";
job.set("mapreduce.output.textoutputformat.separator", separator);
job.set(JobContext.TASK_ATTEMPT_ID, attempt);
job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS,"true");
FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
if (!fs.mkdirs(workDir)) {
fail("Failed to create output directory");
}
String file = "test.txt";
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
TextOutputFormat<Object,Object> theOutputFormat = new TextOutputFormat<Object,Object>();
RecordWriter<Object,Object> theRecordWriter =
theOutputFormat.getRecordWriter(localFs, job, file, reporter);
Text key1 = new Text("key1");
Text key2 = new Text("key2");
Text val1 = new Text("val1");
Text val2 = new Text("val2");
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key1, val1);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val1);
theRecordWriter.write(nullWritable, val2);
theRecordWriter.write(key2, nullWritable);
theRecordWriter.write(key1, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key2, val2);
} finally {
theRecordWriter.close(reporter);
}
File expectedFile = new File(new Path(workDir, file).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append(separator).append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append(separator).append(val2).append("\n");
String output = UtilsForTests.slurp(expectedFile);
assertEquals(output, expectedOutput.toString());
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
new TestTextOutputFormat().testFormat(); new TestTextOutputFormat().testFormat();
} }