From baefc65f8053aff32f94fe15d24bdea239626e27 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 19 Nov 2019 19:29:25 -0800 Subject: [PATCH] Retrying with a backward compatible task type on unknown task type error in parallel indexing (#8905) * Retrying with a backward compatible task type on unknown task type error in parallel indexing * Register legacy class; add a serde test --- .../druid/indexing/common/task/Task.java | 32 ++++--- .../parallel/LegacySinglePhaseSubTask.java | 68 ++++++++++++++ .../batch/parallel/ParallelIndexIOConfig.java | 2 +- .../batch/parallel/SinglePhaseSubTask.java | 1 + .../parallel/SinglePhaseSubTaskSpec.java | 17 ++++ .../task/batch/parallel/SubTaskSpec.java | 17 ++++ .../task/batch/parallel/TaskMonitor.java | 44 +++++++-- .../druid/indexing/common/TestUtils.java | 3 + .../task/NoopIndexTaskClientFactory.java | 39 ++++++++ .../parallel/SinglePhaseSubTaskSpecTest.java | 91 +++++++++++++++++++ .../task/batch/parallel/TaskMonitorTest.java | 81 +++++++++++++++-- 11 files changed, 365 insertions(+), 30 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 58612ce523e..4a511211a89 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -20,11 +20,13 @@ package org.apache.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask; @@ -48,21 +50,21 @@ import java.util.Map; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "kill", value = KillTask.class), - @JsonSubTypes.Type(name = "move", value = MoveTask.class), - @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class), - @JsonSubTypes.Type(name = "restore", value = RestoreTask.class), - @JsonSubTypes.Type(name = "index", value = IndexTask.class), - @JsonSubTypes.Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class), - @JsonSubTypes.Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class), - @JsonSubTypes.Type(name = "index_sub", value = SinglePhaseSubTask.class), // for backward compatibility - @JsonSubTypes.Type(name = PartialSegmentGenerateTask.TYPE, value = PartialSegmentGenerateTask.class), - @JsonSubTypes.Type(name = PartialSegmentMergeTask.TYPE, value = PartialSegmentMergeTask.class), - @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class), - @JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class), - @JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class), - @JsonSubTypes.Type(name = "noop", value = NoopTask.class), - @JsonSubTypes.Type(name = "compact", value = CompactionTask.class) + @Type(name = "kill", value = KillTask.class), + @Type(name = "move", value = MoveTask.class), + @Type(name = "archive", value = ArchiveTask.class), + @Type(name = "restore", value = RestoreTask.class), + @Type(name = "index", value = IndexTask.class), + @Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class), + @Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class), + @Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = LegacySinglePhaseSubTask.class), // for backward compatibility + @Type(name = PartialSegmentGenerateTask.TYPE, value = PartialSegmentGenerateTask.class), + @Type(name = PartialSegmentMergeTask.TYPE, value = PartialSegmentMergeTask.class), + @Type(name = "index_hadoop", value = HadoopIndexTask.class), + @Type(name = "index_realtime", value = RealtimeIndexTask.class), + @Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class), + @Type(name = "noop", value = NoopTask.class), + @Type(name = "compact", value = CompactionTask.class) }) public interface Task { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java new file mode 100644 index 00000000000..4e842f2684f --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.indexing.common.task.IndexTaskClientFactory; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; + +import javax.annotation.Nullable; +import java.util.Map; + +public class LegacySinglePhaseSubTask extends SinglePhaseSubTask +{ + @JsonCreator + public LegacySinglePhaseSubTask( + @JsonProperty("id") @Nullable final String id, + @JsonProperty("groupId") final String groupId, + @JsonProperty("resource") final TaskResource taskResource, + @JsonProperty("supervisorTaskId") final String supervisorTaskId, + @JsonProperty("numAttempts") final int numAttempts, // zero-based counting + @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema, + @JsonProperty("context") final Map context, + @JacksonInject IndexingServiceClient indexingServiceClient, + @JacksonInject IndexTaskClientFactory taskClientFactory, + @JacksonInject AppenderatorsManager appenderatorsManager + ) + { + super( + id, + groupId, + taskResource, + supervisorTaskId, + numAttempts, + ingestionSchema, + context, + indexingServiceClient, + taskClientFactory, + appenderatorsManager + ); + } + + @Override + public String getType() + { + return SinglePhaseSubTask.OLD_TYPE_NAME; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java index b272dd87616..860e81673c5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java @@ -37,7 +37,7 @@ public class ParallelIndexIOConfig extends IndexIOConfig { @JsonCreator public ParallelIndexIOConfig( - @JsonProperty("firehose") FirehoseFactory firehoseFactory, + @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory, @JsonProperty("inputSource") @Nullable InputSource inputSource, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 6408299d084..762dc7dee60 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -100,6 +100,7 @@ import java.util.stream.Collectors; public class SinglePhaseSubTask extends AbstractBatchIndexTask { public static final String TYPE = "single_phase_sub_task"; + public static final String OLD_TYPE_NAME = "index_sub"; private static final Logger LOG = new Logger(SinglePhaseSubTask.class); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java index 8df941c806c..2afa7eaa1c3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java @@ -66,4 +66,21 @@ class SinglePhaseSubTaskSpec extends SubTaskSpec new DummyForInjectionAppenderatorsManager() ); } + + @Override + public SinglePhaseSubTask newSubTaskWithBackwardCompatibleType(int numAttempts) + { + return new LegacySinglePhaseSubTask( + null, + getGroupId(), + null, + getSupervisorTaskId(), + numAttempts, + getIngestionSpec(), + getContext(), + null, + null, + new DummyForInjectionAppenderatorsManager() + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java index c11552e4a15..f23f260b208 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java @@ -80,5 +80,22 @@ public abstract class SubTaskSpec return inputSplit; } + /** + * Creates a new task for this SubTaskSpec. + */ public abstract T newSubTask(int numAttempts); + + /** + * Creates a new task but with a backward compatible type for this SubTaskSpec. This is to support to rolling update + * for parallel indexing task and subclasses override this method properly if its type name has changed between + * releases. See https://github.com/apache/incubator-druid/issues/8836 for more details. + * + * This method will be called if {@link #newSubTask} fails with an {@link IllegalStateException} with an error + * message starting with "Could not resolve type id". The failure of {@link #newSubTask} with this error is NOT + * recorded as a failed attempt in {@link TaskHistory}. + */ + public T newSubTaskWithBackwardCompatibleType(int numAttempts) + { + return newSubTask(numAttempts); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index e4065d27881..8e21b68ee60 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -222,11 +222,10 @@ public class TaskMonitor { synchronized (startStopLock) { if (!running) { - return Futures.immediateFailedFuture(new ISE("TaskMonitore is not running")); + return Futures.immediateFailedFuture(new ISE("TaskMonitor is not running")); } - final T task = spec.newSubTask(0); - log.info("Submitting a new task[%s] for spec[%s]", task.getId(), spec.getId()); - indexingServiceClient.runTask(task); + final T task = submitTask(spec, 0); + log.info("Submitted a new task[%s] for spec[%s]", task.getId(), spec.getId()); incrementNumRunningTasks(); final SettableFuture> taskFuture = SettableFuture.create(); @@ -248,9 +247,8 @@ public class TaskMonitor synchronized (startStopLock) { if (running) { final SubTaskSpec spec = monitorEntry.spec; - final T task = spec.newSubTask(monitorEntry.taskHistory.size() + 1); - log.info("Submitting a new task[%s] for retrying spec[%s]", task.getId(), spec.getId()); - indexingServiceClient.runTask(task); + final T task = submitTask(spec, monitorEntry.taskHistory.size() + 1); + log.info("Submitted a new task[%s] for retrying spec[%s]", task.getId(), spec.getId()); incrementNumRunningTasks(); runningTasks.put( @@ -265,6 +263,38 @@ public class TaskMonitor } } + private T submitTask(SubTaskSpec spec, int numAttempts) + { + T task = spec.newSubTask(numAttempts); + try { + indexingServiceClient.runTask(task); + } + catch (Exception e) { + if (isUnknownTypeIdException(e)) { + log.warn(e, "Got an unknown type id error. Retrying with a backward compatible type."); + task = spec.newSubTaskWithBackwardCompatibleType(numAttempts); + indexingServiceClient.runTask(task); + } else { + throw e; + } + } + return task; + } + + private boolean isUnknownTypeIdException(Throwable e) + { + if (e instanceof IllegalStateException) { + if (e.getMessage() != null && e.getMessage().contains("Could not resolve type id")) { + return true; + } + } + if (e.getCause() != null) { + return isUnknownTypeIdException(e.getCause()); + } else { + return false; + } + } + private void incrementNumRunningTasks() { synchronized (taskCountLock) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java index 4c81da3bceb..723722343e6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java @@ -31,6 +31,8 @@ import org.apache.druid.data.input.impl.NoopInputFormat; import org.apache.druid.data.input.impl.NoopInputSource; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.IndexTaskClientFactory; +import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; @@ -95,6 +97,7 @@ public class TestUtils .addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of())) .addValue(AppenderatorsManager.class, new TestAppenderatorsManager()) .addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller()) + .addValue(IndexTaskClientFactory.class, new NoopIndexTaskClientFactory()) ); jsonMapper.registerModule( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java new file mode 100644 index 00000000000..8138f7c5c71 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.indexing.common.IndexTaskClient; +import org.apache.druid.indexing.common.TaskInfoProvider; +import org.joda.time.Duration; + +public class NoopIndexTaskClientFactory implements IndexTaskClientFactory +{ + @Override + public IndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String callerId, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + throw new UnsupportedOperationException(); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java new file mode 100644 index 00000000000..039a4791759 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.batch.parallel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public class SinglePhaseSubTaskSpecTest +{ + private static final SinglePhaseSubTaskSpec SPEC = new SinglePhaseSubTaskSpec( + "id", + "groupId", + "supervisorTaskId", + new ParallelIndexIngestionSpec( + new DataSchema( + "dataSource", + new TimestampSpec(null, null, null), + new DimensionsSpec(null), + new AggregatorFactory[0], + null, + null + ), + new ParallelIndexIOConfig( + null, + new LocalInputSource(new File("baseDir"), "filter"), + new JsonInputFormat(null, null), + null + ), + null + ), + null, + new InputSplit<>("string split") + ); + + private ObjectMapper mapper; + + @Before + public void setup() + { + mapper = new TestUtils().getTestObjectMapper(); + } + + @Test + public void testNewSubTaskType() throws IOException + { + final SinglePhaseSubTask expected = SPEC.newSubTask(0); + final byte[] json = mapper.writeValueAsBytes(expected); + final Map actual = mapper.readValue(json, Map.class); + Assert.assertEquals(SinglePhaseSubTask.TYPE, actual.get("type")); + } + + @Test + public void testNewSubTaskWithBackwardCompatibleType() throws IOException + { + final SinglePhaseSubTask expected = SPEC.newSubTaskWithBackwardCompatibleType(0); + final byte[] json = mapper.writeValueAsBytes(expected); + final Map actual = mapper.readValue(json, Map.class); + Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type")); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index 604376891cf..07a231e31a1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.junit.After; import org.junit.Assert; @@ -77,7 +78,7 @@ public class TaskMonitorTest final List>> futures = IntStream .range(0, 10) .mapToObj(i -> monitor.submit( - new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0) + new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0, false) )) .collect(Collectors.toList()); for (int i = 0; i < futures.size(); i++) { @@ -98,7 +99,16 @@ public class TaskMonitorTest final List specs = IntStream .range(0, 10) .mapToObj( - i -> new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 2) + i -> new TestTaskSpec( + "specId" + i, + "groupId", + "supervisorId", + null, + new IntegerInputSplit(i), + 100L, + 2, + false + ) ) .collect(Collectors.toList()); final List>> futures = specs @@ -127,43 +137,97 @@ public class TaskMonitorTest } } + @Test + public void testResubmitWithOldType() throws InterruptedException, ExecutionException, TimeoutException + { + final List specs = IntStream + .range(0, 10) + .mapToObj( + i -> new TestTaskSpec( + "specId" + i, + "groupId", + "supervisorId", + null, + new IntegerInputSplit(i), + 100L, + 0, + true + ) + ) + .collect(Collectors.toList()); + final List>> futures = specs + .stream() + .map(monitor::submit) + .collect(Collectors.toList()); + for (int i = 0; i < futures.size(); i++) { + // # of threads of taskRunner is 5, and each task is expected to be run 3 times (with 2 retries), so the expected + // max timeout is 6 sec. We additionally wait 4 more seconds here to make sure the test passes. + final SubTaskCompleteEvent result = futures.get(i).get(2, TimeUnit.SECONDS); + Assert.assertEquals("supervisorId", result.getSpec().getSupervisorTaskId()); + Assert.assertEquals("specId" + i, result.getSpec().getId()); + + Assert.assertNotNull(result.getLastStatus()); + Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, result.getLastState()); + + final TaskHistory taskHistory = monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId()); + Assert.assertNotNull(taskHistory); + + final List attemptHistory = taskHistory.getAttemptHistory(); + Assert.assertNotNull(attemptHistory); + Assert.assertEquals(1, attemptHistory.size()); + Assert.assertEquals(TaskState.SUCCESS, attemptHistory.get(0).getStatusCode()); + } + } + private static class TestTaskSpec extends SubTaskSpec { private final long runTime; private final int numMaxFails; + private final boolean throwUnknownTypeIdError; private int numFails; - public TestTaskSpec( + TestTaskSpec( String id, String groupId, String supervisorTaskId, Map context, InputSplit inputSplit, long runTime, - int numMaxFails + int numMaxFails, + boolean throwUnknownTypeIdError ) { super(id, groupId, supervisorTaskId, context, inputSplit); this.runTime = runTime; this.numMaxFails = numMaxFails; + this.throwUnknownTypeIdError = throwUnknownTypeIdError; } @Override public TestTask newSubTask(int numAttempts) { - return new TestTask(getId(), runTime, numFails++ < numMaxFails); + return new TestTask(getId(), runTime, numFails++ < numMaxFails, throwUnknownTypeIdError); + } + + @Override + public TestTask newSubTaskWithBackwardCompatibleType(int numAttempts) + { + return new TestTask(getId(), runTime, numFails++ < numMaxFails, false); } } private static class TestTask extends NoopTask { private final boolean shouldFail; + private final boolean throwUnknownTypeIdError; - TestTask(String id, long runTime, boolean shouldFail) + TestTask(String id, long runTime, boolean shouldFail, boolean throwUnknownTypeIdError) { super(id, null, "testDataSource", runTime, 0, null, null, null); this.shouldFail = shouldFail; + this.throwUnknownTypeIdError = throwUnknownTypeIdError; } @Override @@ -185,6 +249,9 @@ public class TaskMonitorTest { final TestTask task = (TestTask) taskObject; tasks.put(task.getId(), TaskState.RUNNING); + if (task.throwUnknownTypeIdError) { + throw new RuntimeException(new ISE("Could not resolve type id 'test_task_id'")); + } taskRunner.submit(() -> tasks.put(task.getId(), task.run(null).getStatusCode())); return task.getId(); } @@ -213,7 +280,7 @@ public class TaskMonitorTest private static class IntegerInputSplit extends InputSplit { - public IntegerInputSplit(int split) + IntegerInputSplit(int split) { super(split); }