MAPREDUCE-4376. TestClusterMRNotification times out (Kihwal Lee via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1355124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-06-28 19:28:42 +00:00
parent 550853203b
commit aad2a6f0fc
4 changed files with 52 additions and 14 deletions

View File

@ -194,6 +194,8 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4372. Deadlock in Resource Manager (Devaraj K via bobby) MAPREDUCE-4372. Deadlock in Resource Manager (Devaraj K via bobby)
MAPREDUCE-4376. TestClusterMRNotification times out (Kihwal Lee via bobby)
Release 2.0.0-alpha - 05-23-2012 Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1097,10 +1097,17 @@ public class RMContainerAllocator extends RMContainerRequestor
} }
ContainerId get(TaskAttemptId tId) { ContainerId get(TaskAttemptId tId) {
Container taskContainer;
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
return maps.get(tId).getId(); taskContainer = maps.get(tId);
} else { } else {
return reduces.get(tId).getId(); taskContainer = reduces.get(tId);
}
if (taskContainer == null) {
return null;
} else {
return taskContainer.getId();
} }
} }
} }

View File

@ -91,26 +91,22 @@ public abstract class NotificationTestCase extends HadoopTestCase {
public static class NotificationServlet extends HttpServlet { public static class NotificationServlet extends HttpServlet {
public static int counter = 0; public static int counter = 0;
public static int failureCounter = 0;
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected void doGet(HttpServletRequest req, HttpServletResponse res) protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException { throws ServletException, IOException {
String queryString = req.getQueryString();
switch (counter) { switch (counter) {
case 0: case 0:
{ verifyQuery(queryString, "SUCCEEDED");
assertTrue(req.getQueryString().contains("SUCCEEDED")); break;
}
break;
case 2: case 2:
{ verifyQuery(queryString, "KILLED");
assertTrue(req.getQueryString().contains("KILLED")); break;
}
break;
case 4: case 4:
{ verifyQuery(queryString, "FAILED");
assertTrue(req.getQueryString().contains("FAILED")); break;
}
break;
} }
if (counter % 2 == 0) { if (counter % 2 == 0) {
res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error"); res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
@ -120,6 +116,15 @@ public abstract class NotificationTestCase extends HadoopTestCase {
} }
counter++; counter++;
} }
protected void verifyQuery(String query, String expected)
throws IOException {
if (query.contains(expected)) {
return;
}
failureCounter++;
assertTrue("The request (" + query + ") does not contain " + expected, false);
}
} }
private String getNotificationUrlTemplate() { private String getNotificationUrlTemplate() {
@ -147,10 +152,12 @@ public abstract class NotificationTestCase extends HadoopTestCase {
} }
public void testMR() throws Exception { public void testMR() throws Exception {
System.out.println(launchWordCount(this.createJobConf(), System.out.println(launchWordCount(this.createJobConf(),
"a b c d e f g h", 1, 1)); "a b c d e f g h", 1, 1));
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(2, NotificationServlet.counter); assertEquals(2, NotificationServlet.counter);
assertEquals(0, NotificationServlet.failureCounter);
Path inDir = new Path("notificationjob/input"); Path inDir = new Path("notificationjob/input");
Path outDir = new Path("notificationjob/output"); Path outDir = new Path("notificationjob/output");
@ -168,12 +175,14 @@ public abstract class NotificationTestCase extends HadoopTestCase {
outDir).getID()); outDir).getID());
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(4, NotificationServlet.counter); assertEquals(4, NotificationServlet.counter);
assertEquals(0, NotificationServlet.failureCounter);
// run a job with FAILED status // run a job with FAILED status
System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir, System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
outDir).getID()); outDir).getID());
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(6, NotificationServlet.counter); assertEquals(6, NotificationServlet.counter);
assertEquals(0, NotificationServlet.failureCounter);
} }
private String launchWordCount(JobConf conf, private String launchWordCount(JobConf conf,

View File

@ -606,9 +606,14 @@ public class UtilsForTests {
conf.setReducerClass(IdentityReducer.class); conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) { while (!job.isComplete()) {
try { try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100); Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; break;
} }
@ -626,9 +631,14 @@ public class UtilsForTests {
conf.setMaxMapAttempts(1); conf.setMaxMapAttempts(1);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (!job.isComplete()) { while (!job.isComplete()) {
try { try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100); Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; break;
} }
@ -646,17 +656,27 @@ public class UtilsForTests {
conf.setReducerClass(IdentityReducer.class); conf.setReducerClass(IdentityReducer.class);
RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
long sleepCount = 0;
while (job.getJobState() != JobStatus.RUNNING) { while (job.getJobState() != JobStatus.RUNNING) {
try { try {
if (sleepCount > 300) { // 30 seconds
throw new IOException("Job didn't finish in 30 seconds");
}
Thread.sleep(100); Thread.sleep(100);
sleepCount++;
} catch (InterruptedException e) { } catch (InterruptedException e) {
break; break;
} }
} }
job.killJob(); job.killJob();
sleepCount = 0;
while (job.cleanupProgress() == 0.0f) { while (job.cleanupProgress() == 0.0f) {
try { try {
if (sleepCount > 2000) { // 20 seconds
throw new IOException("Job cleanup didn't start in 20 seconds");
}
Thread.sleep(10); Thread.sleep(10);
sleepCount++;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
break; break;
} }