From f3ff61865a82a19c281b9e46c4b039aa2c3a1877 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein Date: Tue, 28 Jan 2020 10:57:33 -0600 Subject: [PATCH] MAPREDUCE-7259. testSpeculateSuccessfulWithUpdateEvents fails Intermittently Signed-off-by: Jonathan Eagles (cherry picked from commit 08251538fe2550d9dd86f9daf79994f5b8bdf7fa) --- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 27 -- .../v2/TestSpeculativeExecutionWithMRApp.java | 298 ++++++++---------- 2 files changed, 123 insertions(+), 202 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 9a8280bf65c..4be80c44a3e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -22,11 +22,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Arrays; import java.util.EnumSet; -import java.util.List; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; @@ -375,30 +372,6 @@ public class MRApp extends MRAppMaster { report.getTaskAttemptState()); } - public void waitForState(TaskAttempt attempt, - TaskAttemptState...finalStates) throws Exception { - int timeoutSecs = 0; - TaskAttemptReport report = attempt.getReport(); - List targetStates = Arrays.asList(finalStates); - String statesValues = targetStates.stream().map(Object::toString).collect( - Collectors.joining(",")); - while (!targetStates.contains(report.getTaskAttemptState()) && - timeoutSecs++ < 20) { - System.out.println( - "TaskAttempt " + attempt.getID().toString() + " State is : " - + report.getTaskAttemptState() - + " Waiting for states: " + statesValues - + ". curent state is : " + report.getTaskAttemptState() - + ". progress : " + report.getProgress()); - report = attempt.getReport(); - Thread.sleep(500); - } - System.out.println("TaskAttempt State is : " - + report.getTaskAttemptState()); - Assert.assertTrue("TaskAttempt state is not correct (timedout)", - targetStates.contains(report.getTaskAttemptState())); - } - public void waitForState(Task task, TaskState finalState) throws Exception { int timeoutSecs = 0; TaskReport report = task.getReport(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index d4d432b94d8..2163d7bc170 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -18,16 +18,13 @@ package org.apache.hadoop.mapreduce.v2; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator; @@ -50,18 +47,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; -import org.apache.hadoop.yarn.util.SystemClock; -import org.junit.Rule; +import org.junit.Before; import org.junit.Test; -import com.google.common.base.Supplier; -import org.junit.rules.TestRule; -import org.junit.runner.Description; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.model.Statement; /** * The type Test speculative execution with mr app. @@ -70,74 +61,11 @@ import org.junit.runners.model.Statement; @SuppressWarnings({ "unchecked", "rawtypes" }) @RunWith(Parameterized.class) public class TestSpeculativeExecutionWithMRApp { - /** Number of times to re-try the failing tests. */ - private static final int ASSERT_SPECULATIONS_COUNT_RETRIES = 3; private static final int NUM_MAPPERS = 5; private static final int NUM_REDUCERS = 0; /** - * Speculation has non-deterministic behavior due to racing and timing. Use - * retry to verify that junit tests can pass. - */ - @Retention(RetentionPolicy.RUNTIME) - public @interface Retry {} - - /** - * The type Retry rule. - */ - class RetryRule implements TestRule { - - private AtomicInteger retryCount; - - /** - * Instantiates a new Retry rule. - * - * @param retries the retries - */ - RetryRule(int retries) { - super(); - this.retryCount = new AtomicInteger(retries); - } - - @Override - public Statement apply(final Statement base, - final Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - Throwable caughtThrowable = null; - - while (retryCount.getAndDecrement() > 0) { - try { - base.evaluate(); - return; - } catch (Throwable t) { - if (retryCount.get() > 0 && - description.getAnnotation(Retry.class) != null) { - caughtThrowable = t; - System.out.println( - description.getDisplayName() + - ": Failed, " + - retryCount.toString() + - " retries remain"); - } else { - throw caughtThrowable; - } - } - } - } - }; - } - } - - /** - * The Rule. - */ - @Rule - public RetryRule rule = new RetryRule(ASSERT_SPECULATIONS_COUNT_RETRIES); - - /** - * Gets test parameters. + * Get test parameters. * * @return the test parameters */ @@ -151,6 +79,7 @@ public class TestSpeculativeExecutionWithMRApp { private Class estimatorClass; + private final ControlledClock controlledClk; /** * Instantiates a new Test speculative execution with mr app. * @@ -159,6 +88,12 @@ public class TestSpeculativeExecutionWithMRApp { public TestSpeculativeExecutionWithMRApp( Class estimatorKlass) { this.estimatorClass = estimatorKlass; + this.controlledClk = new ControlledClock(); + } + + @Before + public void setup() { + this.controlledClk.setTime(System.currentTimeMillis()); } /** @@ -166,16 +101,11 @@ public class TestSpeculativeExecutionWithMRApp { * * @throws Exception the exception */ - @Retry @Test (timeout = 360000) public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { - - Clock actualClock = SystemClock.getInstance(); - final ControlledClock clock = new ControlledClock(actualClock); - clock.setTime(System.currentTimeMillis()); - MRApp app = - new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock); + new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, + controlledClk); Job job = app.submit(createConfiguration(), true, true); app.waitForState(job, JobState.RUNNING); @@ -187,19 +117,13 @@ public class TestSpeculativeExecutionWithMRApp { app.waitForState(taskIter.next(), TaskState.RUNNING); } - // Process the update events - clock.setTime(System.currentTimeMillis() + 2000); + // Process the update events. + controlledClk.tickMsec(1000L); EventHandler appEventHandler = app.getContext().getEventHandler(); for (Map.Entry mapTask : tasks.entrySet()) { for (Map.Entry taskAttempt : mapTask .getValue().getAttempts().entrySet()) { - TaskAttemptStatus status = - createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.8, - TaskAttemptState.RUNNING); - TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); - appEventHandler.handle(event); + updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.8f); } } @@ -210,34 +134,26 @@ public class TestSpeculativeExecutionWithMRApp { // Other than one random task, finish every other task. for (Map.Entry mapTask : tasks.entrySet()) { - for (Map.Entry taskAttempt : mapTask - .getValue().getAttempts().entrySet()) { - if (mapTask.getKey() != taskToBeSpeculated.getID()) { - appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_DONE)); - appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); - app.waitForState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED, - TaskAttemptState.KILLED); + if (mapTask.getKey() != taskToBeSpeculated.getID()) { + for (Map.Entry taskAttempt : mapTask + .getValue().getAttempts().entrySet()) { + TaskAttemptId taId = taskAttempt.getKey(); + if (taId.getId() > 0) { + // in case the speculator started a speculative TA, then skip it. + continue; + } + markTACompleted(appEventHandler, taskAttempt.getValue()); + waitForTAState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED, + controlledClk); } } } - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - if (taskToBeSpeculated.getAttempts().size() != 2) { - clock.setTime(System.currentTimeMillis() + 1000); - return false; - } else { - return true; - } - } - }, 1000, 60000); + controlledClk.tickMsec(2000L); + waitForSpeculation(taskToBeSpeculated, controlledClk); // finish 1st TA, 2nd will be killed TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated); - verifySpeculationMessage(app, ta); - app.waitForState(Service.STATE.STOPPED); + waitForTAState(ta[0], TaskAttemptState.SUCCEEDED, controlledClk); + waitForAppStop(app, controlledClk); } /** @@ -245,16 +161,11 @@ public class TestSpeculativeExecutionWithMRApp { * * @throws Exception the exception */ - @Retry @Test (timeout = 360000) public void testSpeculateSuccessfulWithUpdateEvents() throws Exception { - - Clock actualClock = SystemClock.getInstance(); - final ControlledClock clock = new ControlledClock(actualClock); - clock.setTime(System.currentTimeMillis()); - MRApp app = - new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, clock); + new MRApp(NUM_MAPPERS, NUM_REDUCERS, false, "test", true, + controlledClk); Job job = app.submit(createConfiguration(), true, true); app.waitForState(job, JobState.RUNNING); @@ -266,103 +177,77 @@ public class TestSpeculativeExecutionWithMRApp { app.waitForState(taskIter.next(), TaskState.RUNNING); } - // Process the update events - clock.setTime(System.currentTimeMillis() + 1000); + // process the update events. Note that we should avoid advancing the clock + // by a value that triggers a speculation scan while updating the task + // progress, because the speculator may concurrently speculate tasks before + // we update their progress. + controlledClk.tickMsec(2000L); EventHandler appEventHandler = app.getContext().getEventHandler(); for (Map.Entry mapTask : tasks.entrySet()) { for (Map.Entry taskAttempt : mapTask .getValue().getAttempts().entrySet()) { - TaskAttemptStatus status = - createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.5, - TaskAttemptState.RUNNING); - TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); - appEventHandler.handle(event); + updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.5f); } } Task speculatedTask = null; int numTasksToFinish = NUM_MAPPERS + NUM_REDUCERS - 1; - clock.setTime(System.currentTimeMillis() + 1000); + controlledClk.tickMsec(1000L); for (Map.Entry task : tasks.entrySet()) { for (Map.Entry taskAttempt : task.getValue() .getAttempts().entrySet()) { - if (numTasksToFinish > 0) { - appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_DONE)); - appEventHandler.handle(new TaskAttemptEvent(taskAttempt.getKey(), - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + TaskAttemptId taId = taskAttempt.getKey(); + if (numTasksToFinish > 0 && taId.getId() == 0) { + // Skip speculative attempts if any. + markTACompleted(appEventHandler, taskAttempt.getValue()); numTasksToFinish--; - app.waitForState(taskAttempt.getValue(), TaskAttemptState.KILLED, - TaskAttemptState.SUCCEEDED); + waitForTAState(taskAttempt.getValue(), TaskAttemptState.SUCCEEDED, + controlledClk); } else { // The last task is chosen for speculation - TaskAttemptStatus status = - createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, - TaskAttemptState.RUNNING); speculatedTask = task.getValue(); - TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); - appEventHandler.handle(event); + updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.75f); } } } - clock.setTime(System.currentTimeMillis() + 15000); + controlledClk.tickMsec(15000L); for (Map.Entry task : tasks.entrySet()) { for (Map.Entry taskAttempt : task.getValue() .getAttempts().entrySet()) { + // Skip task attempts that are finished or killed. if (!(taskAttempt.getValue().getState() == TaskAttemptState.SUCCEEDED || taskAttempt.getValue().getState() == TaskAttemptState.KILLED)) { - TaskAttemptStatus status = - createTaskAttemptStatus(taskAttempt.getKey(), (float) 0.75, - TaskAttemptState.RUNNING); - TaskAttemptStatusUpdateEvent event = - new TaskAttemptStatusUpdateEvent(taskAttempt.getKey(), - new AtomicReference<>(status)); - appEventHandler.handle(event); + updateTaskProgress(appEventHandler, taskAttempt.getValue(), 0.75f); } } } final Task speculatedTaskConst = speculatedTask; - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - if (speculatedTaskConst.getAttempts().size() != 2) { - clock.setTime(System.currentTimeMillis() + 1000); - return false; - } else { - return true; - } - } - }, 1000, 60000); + waitForSpeculation(speculatedTaskConst, controlledClk); + TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask); - verifySpeculationMessage(app, ta); - app.waitForState(Service.STATE.STOPPED); + waitForTAState(ta[0], TaskAttemptState.SUCCEEDED, controlledClk); + waitForAppStop(app, controlledClk); } private static TaskAttempt[] makeFirstAttemptWin( EventHandler appEventHandler, Task speculatedTask) { - // finish 1st TA, 2nd will be killed Collection attempts = speculatedTask.getAttempts().values(); TaskAttempt[] ta = new TaskAttempt[attempts.size()]; attempts.toArray(ta); - appEventHandler.handle( - new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE)); - appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + markTACompleted(appEventHandler, ta[0]); return ta; } - private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta) - throws Exception { - app.waitForState(ta[0], TaskAttemptState.SUCCEEDED); - // The speculative attempt may be not killed before the MR job succeeds. + private static void markTACompleted( + EventHandler appEventHandler, TaskAttempt attempt) { + appEventHandler.handle( + new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_DONE)); + appEventHandler.handle(new TaskAttemptEvent(attempt.getID(), + TaskAttemptEventType.TA_CONTAINER_COMPLETED)); } private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id, @@ -387,6 +272,69 @@ public class TestSpeculativeExecutionWithMRApp { MRJobConfig.MR_AM_TASK_ESTIMATOR_SIMPLE_SMOOTH_LAMBDA_MS, 1000L * 10); } + conf.setLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE, + 3000L); return conf; } + + /** + * Wait for MRapp to stop while incrementing the controlled clock. + * @param app the MRApp to be stopped. + * @param cClock the controlled clock of the test. + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForAppStop(final MRApp app, final ControlledClock cClock) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + if (app.getServiceState() != Service.STATE.STOPPED) { + cClock.tickMsec(250L); + return false; + } + return true; + }, 250, 60000); + } + + /** + * Wait for the task to trigger a new speculation. + * @param speculatedTask the task we are monitoring. + * @param cClock the controlled clock of the test. + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitForSpeculation(final Task speculatedTask, + final ControlledClock cClock) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + if (speculatedTask.getAttempts().size() != 2) { + cClock.tickMsec(250L); + return false; + } + return true; + }, 250, 60000); + } + + public void waitForTAState(TaskAttempt attempt, + TaskAttemptState finalState, final ControlledClock cClock) + throws Exception { + GenericTestUtils.waitFor(() -> { + if (attempt.getReport().getTaskAttemptState() != finalState) { + cClock.tickMsec(250L); + return false; + } + return true; + }, 250, 10000); + } + + private void updateTaskProgress(EventHandler appEventHandler, + TaskAttempt attempt, float newProgress) { + TaskAttemptStatus status = + createTaskAttemptStatus(attempt.getID(), newProgress, + TaskAttemptState.RUNNING); + TaskAttemptStatusUpdateEvent event = + new TaskAttemptStatusUpdateEvent(attempt.getID(), + new AtomicReference<>(status)); + appEventHandler.handle(event); + } + }