From 3f01c481060585ccd37be9db8aa4d1e33d2e2d6b Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Mon, 27 Jan 2020 15:50:13 -0600 Subject: [PATCH] MAPREDUCE-7262. MRApp helpers block for long intervals (500ms) Signed-off-by: Jonathan Eagles --- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 83 +++++++------------ 1 file changed, 31 insertions(+), 52 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 70ea18a13b3..9a8280bf65c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -113,6 +113,8 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("unchecked") public class MRApp extends MRAppMaster { private static final Logger LOG = LoggerFactory.getLogger(MRApp.class); + private static final int WAIT_FOR_STATE_CNT = 200; + private static final int WAIT_FOR_STATE_INTERVAL= 50; /** * The available resource of each container allocated. @@ -322,13 +324,11 @@ public class MRApp extends MRAppMaster { JobStateInternal finalState) throws Exception { int timeoutSecs = 0; JobStateInternal iState = job.getInternalState(); - while (!finalState.equals(iState) && timeoutSecs++ < 20) { - System.out.println("Job Internal State is : " + iState - + " Waiting for Internal state : " + finalState); - Thread.sleep(500); + while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) { + Thread.sleep(WAIT_FOR_STATE_INTERVAL); iState = job.getInternalState(); } - System.out.println("Task Internal State is : " + iState); + LOG.info("Job {} Internal State is : {}", job.getID(), iState); Assert.assertEquals("Task Internal state is not correct (timedout)", finalState, iState); } @@ -336,17 +336,12 @@ public class MRApp extends MRAppMaster { public void waitForInternalState(TaskImpl task, TaskStateInternal finalState) throws Exception { int timeoutSecs = 0; - TaskReport report = task.getReport(); TaskStateInternal iState = task.getInternalState(); - while (!finalState.equals(iState) && timeoutSecs++ < 20) { - System.out.println("Task Internal State is : " + iState - + " Waiting for Internal state : " + finalState + " progress : " - + report.getProgress()); - Thread.sleep(500); - report = task.getReport(); + while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) { + Thread.sleep(WAIT_FOR_STATE_INTERVAL); iState = task.getInternalState(); } - System.out.println("Task Internal State is : " + iState); + LOG.info("Task {} Internal State is : {}", task.getID(), iState); Assert.assertEquals("Task Internal state is not correct (timedout)", finalState, iState); } @@ -354,17 +349,12 @@ public class MRApp extends MRAppMaster { public void waitForInternalState(TaskAttemptImpl attempt, TaskAttemptStateInternal finalState) throws Exception { int timeoutSecs = 0; - TaskAttemptReport report = attempt.getReport(); TaskAttemptStateInternal iState = attempt.getInternalState(); - while (!finalState.equals(iState) && timeoutSecs++ < 20) { - System.out.println("TaskAttempt Internal State is : " + iState - + " Waiting for Internal state : " + finalState + " progress : " - + report.getProgress()); - Thread.sleep(500); - report = attempt.getReport(); + while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) { + Thread.sleep(WAIT_FOR_STATE_INTERVAL); iState = attempt.getInternalState(); } - System.out.println("TaskAttempt Internal State is : " + iState); + LOG.info("TaskAttempt {} Internal State is : {}", attempt.getID(), iState); Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)", finalState, iState); } @@ -374,17 +364,12 @@ public class MRApp extends MRAppMaster { int timeoutSecs = 0; TaskAttemptReport report = attempt.getReport(); while (!finalState.equals(report.getTaskAttemptState()) && - timeoutSecs++ < 20) { - System.out.println( - "TaskAttempt " + attempt.getID().toString() + " State is : " - + report.getTaskAttemptState() - + " Waiting for state : " + finalState - + " progress : " + report.getProgress()); + timeoutSecs++ < WAIT_FOR_STATE_CNT) { + Thread.sleep(WAIT_FOR_STATE_INTERVAL); report = attempt.getReport(); - Thread.sleep(500); } - System.out.println("TaskAttempt State is : " - + report.getTaskAttemptState()); + LOG.info("TaskAttempt {} State is : {}", attempt.getID(), + report.getTaskAttemptState()); Assert.assertEquals("TaskAttempt state is not correct (timedout)", finalState, report.getTaskAttemptState()); @@ -418,14 +403,11 @@ public class MRApp extends MRAppMaster { int timeoutSecs = 0; TaskReport report = task.getReport(); while (!finalState.equals(report.getTaskState()) && - timeoutSecs++ < 20) { - System.out.println("Task State for " + task.getID() + " is : " - + report.getTaskState() + " Waiting for state : " + finalState - + " progress : " + report.getProgress()); + timeoutSecs++ < WAIT_FOR_STATE_CNT) { + Thread.sleep(WAIT_FOR_STATE_INTERVAL); report = task.getReport(); - Thread.sleep(500); } - System.out.println("Task State is : " + report.getTaskState()); + LOG.info("Task {} State is : {}", task.getID(), report.getTaskState()); Assert.assertEquals("Task state is not correct (timedout)", finalState, report.getTaskState()); } @@ -434,15 +416,11 @@ public class MRApp extends MRAppMaster { int timeoutSecs = 0; JobReport report = job.getReport(); while (!finalState.equals(report.getJobState()) && - timeoutSecs++ < 20) { - System.out.println("Job State is : " + report.getJobState() + - " Waiting for state : " + finalState + - " map progress : " + report.getMapProgress() + - " reduce progress : " + report.getReduceProgress()); + timeoutSecs++ < WAIT_FOR_STATE_CNT) { report = job.getReport(); - Thread.sleep(500); + Thread.sleep(WAIT_FOR_STATE_INTERVAL); } - System.out.println("Job State is : " + report.getJobState()); + LOG.info("Job {} State is : {}", job.getID(), report.getJobState()); Assert.assertEquals("Job state is not correct (timedout)", finalState, job.getState()); } @@ -453,12 +431,11 @@ public class MRApp extends MRAppMaster { waitForServiceToStop(20 * 1000)); } else { int timeoutSecs = 0; - while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) { - System.out.println("MRApp State is : " + getServiceState() - + " Waiting for state : " + finalState); - Thread.sleep(500); + while (!finalState.equals(getServiceState()) + && timeoutSecs++ < WAIT_FOR_STATE_CNT) { + Thread.sleep(WAIT_FOR_STATE_INTERVAL); } - System.out.println("MRApp State is : " + getServiceState()); + LOG.info("MRApp State is : {}", getServiceState()); Assert.assertEquals("MRApp state is not correct (timedout)", finalState, getServiceState()); } @@ -467,16 +444,18 @@ public class MRApp extends MRAppMaster { public void verifyCompleted() { for (Job job : getContext().getAllJobs().values()) { JobReport jobReport = job.getReport(); - System.out.println("Job start time :" + jobReport.getStartTime()); - System.out.println("Job finish time :" + jobReport.getFinishTime()); + LOG.info("Job start time :{}", jobReport.getStartTime()); + LOG.info("Job finish time :", jobReport.getFinishTime()); Assert.assertTrue("Job start time is not less than finish time", jobReport.getStartTime() <= jobReport.getFinishTime()); Assert.assertTrue("Job finish time is in future", jobReport.getFinishTime() <= System.currentTimeMillis()); for (Task task : job.getTasks().values()) { TaskReport taskReport = task.getReport(); - System.out.println("Task start time : " + taskReport.getStartTime()); - System.out.println("Task finish time : " + taskReport.getFinishTime()); + LOG.info("Task {} start time : {}", task.getID(), + taskReport.getStartTime()); + LOG.info("Task {} finish time : {}", task.getID(), + taskReport.getFinishTime()); Assert.assertTrue("Task start time is not less than finish time", taskReport.getStartTime() <= taskReport.getFinishTime()); for (TaskAttempt attempt : task.getAttempts().values()) {