Retrying with a backward compatible task type on unknown task type error in parallel indexing (#8905)

* Retrying with a backward compatible task type on unknown task type error in parallel indexing

* Register legacy class; add a serde test
This commit is contained in:
Jihoon Son 2019-11-19 19:29:25 -08:00 committed by Gian Merlino
parent d0913475b7
commit baefc65f80
11 changed files with 365 additions and 30 deletions

View File

@ -20,11 +20,13 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask;
@ -48,21 +50,21 @@ import java.util.Map;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class),
@JsonSubTypes.Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
@JsonSubTypes.Type(name = "index_sub", value = SinglePhaseSubTask.class), // for backward compatibility
@JsonSubTypes.Type(name = PartialSegmentGenerateTask.TYPE, value = PartialSegmentGenerateTask.class),
@JsonSubTypes.Type(name = PartialSegmentMergeTask.TYPE, value = PartialSegmentMergeTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
@Type(name = "kill", value = KillTask.class),
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
@Type(name = "index", value = IndexTask.class),
@Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class),
@Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
@Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = LegacySinglePhaseSubTask.class), // for backward compatibility
@Type(name = PartialSegmentGenerateTask.TYPE, value = PartialSegmentGenerateTask.class),
@Type(name = PartialSegmentMergeTask.TYPE, value = PartialSegmentMergeTask.class),
@Type(name = "index_hadoop", value = HadoopIndexTask.class),
@Type(name = "index_realtime", value = RealtimeIndexTask.class),
@Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
@Type(name = "noop", value = NoopTask.class),
@Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import javax.annotation.Nullable;
import java.util.Map;
public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
{
@JsonCreator
public LegacySinglePhaseSubTask(
@JsonProperty("id") @Nullable final String id,
@JsonProperty("groupId") final String groupId,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("supervisorTaskId") final String supervisorTaskId,
@JsonProperty("numAttempts") final int numAttempts, // zero-based counting
@JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
@JacksonInject IndexingServiceClient indexingServiceClient,
@JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
super(
id,
groupId,
taskResource,
supervisorTaskId,
numAttempts,
ingestionSchema,
context,
indexingServiceClient,
taskClientFactory,
appenderatorsManager
);
}
@Override
public String getType()
{
return SinglePhaseSubTask.OLD_TYPE_NAME;
}
}

View File

@ -37,7 +37,7 @@ public class ParallelIndexIOConfig extends IndexIOConfig
{
@JsonCreator
public ParallelIndexIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("inputSource") @Nullable InputSource inputSource,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting

View File

@ -100,6 +100,7 @@ import java.util.stream.Collectors;
public class SinglePhaseSubTask extends AbstractBatchIndexTask
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
private static final Logger LOG = new Logger(SinglePhaseSubTask.class);

View File

@ -66,4 +66,21 @@ class SinglePhaseSubTaskSpec extends SubTaskSpec<SinglePhaseSubTask>
new DummyForInjectionAppenderatorsManager()
);
}
@Override
public SinglePhaseSubTask newSubTaskWithBackwardCompatibleType(int numAttempts)
{
return new LegacySinglePhaseSubTask(
null,
getGroupId(),
null,
getSupervisorTaskId(),
numAttempts,
getIngestionSpec(),
getContext(),
null,
null,
new DummyForInjectionAppenderatorsManager()
);
}
}

View File

@ -80,5 +80,22 @@ public abstract class SubTaskSpec<T extends Task>
return inputSplit;
}
/**
* Creates a new task for this SubTaskSpec.
*/
public abstract T newSubTask(int numAttempts);
/**
* Creates a new task but with a backward compatible type for this SubTaskSpec. This is to support to rolling update
* for parallel indexing task and subclasses override this method properly if its type name has changed between
* releases. See https://github.com/apache/incubator-druid/issues/8836 for more details.
*
* This method will be called if {@link #newSubTask} fails with an {@link IllegalStateException} with an error
* message starting with "Could not resolve type id". The failure of {@link #newSubTask} with this error is NOT
* recorded as a failed attempt in {@link TaskHistory}.
*/
public T newSubTaskWithBackwardCompatibleType(int numAttempts)
{
return newSubTask(numAttempts);
}
}

View File

@ -222,11 +222,10 @@ public class TaskMonitor<T extends Task>
{
synchronized (startStopLock) {
if (!running) {
return Futures.immediateFailedFuture(new ISE("TaskMonitore is not running"));
return Futures.immediateFailedFuture(new ISE("TaskMonitor is not running"));
}
final T task = spec.newSubTask(0);
log.info("Submitting a new task[%s] for spec[%s]", task.getId(), spec.getId());
indexingServiceClient.runTask(task);
final T task = submitTask(spec, 0);
log.info("Submitted a new task[%s] for spec[%s]", task.getId(), spec.getId());
incrementNumRunningTasks();
final SettableFuture<SubTaskCompleteEvent<T>> taskFuture = SettableFuture.create();
@ -248,9 +247,8 @@ public class TaskMonitor<T extends Task>
synchronized (startStopLock) {
if (running) {
final SubTaskSpec<T> spec = monitorEntry.spec;
final T task = spec.newSubTask(monitorEntry.taskHistory.size() + 1);
log.info("Submitting a new task[%s] for retrying spec[%s]", task.getId(), spec.getId());
indexingServiceClient.runTask(task);
final T task = submitTask(spec, monitorEntry.taskHistory.size() + 1);
log.info("Submitted a new task[%s] for retrying spec[%s]", task.getId(), spec.getId());
incrementNumRunningTasks();
runningTasks.put(
@ -265,6 +263,38 @@ public class TaskMonitor<T extends Task>
}
}
private T submitTask(SubTaskSpec<T> spec, int numAttempts)
{
T task = spec.newSubTask(numAttempts);
try {
indexingServiceClient.runTask(task);
}
catch (Exception e) {
if (isUnknownTypeIdException(e)) {
log.warn(e, "Got an unknown type id error. Retrying with a backward compatible type.");
task = spec.newSubTaskWithBackwardCompatibleType(numAttempts);
indexingServiceClient.runTask(task);
} else {
throw e;
}
}
return task;
}
private boolean isUnknownTypeIdException(Throwable e)
{
if (e instanceof IllegalStateException) {
if (e.getMessage() != null && e.getMessage().contains("Could not resolve type id")) {
return true;
}
}
if (e.getCause() != null) {
return isUnknownTypeIdException(e.getCause());
} else {
return false;
}
}
private void incrementNumRunningTasks()
{
synchronized (taskCountLock) {

View File

@ -31,6 +31,8 @@ import org.apache.druid.data.input.impl.NoopInputFormat;
import org.apache.druid.data.input.impl.NoopInputSource;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
@ -95,6 +97,7 @@ public class TestUtils
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
.addValue(AppenderatorsManager.class, new TestAppenderatorsManager())
.addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
.addValue(IndexTaskClientFactory.class, new NoopIndexTaskClientFactory())
);
jsonMapper.registerModule(

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.joda.time.Duration;
public class NoopIndexTaskClientFactory implements IndexTaskClientFactory
{
@Override
public IndexTaskClient build(
TaskInfoProvider taskInfoProvider,
String callerId,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class SinglePhaseSubTaskSpecTest
{
private static final SinglePhaseSubTaskSpec SPEC = new SinglePhaseSubTaskSpec(
"id",
"groupId",
"supervisorTaskId",
new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
new TimestampSpec(null, null, null),
new DimensionsSpec(null),
new AggregatorFactory[0],
null,
null
),
new ParallelIndexIOConfig(
null,
new LocalInputSource(new File("baseDir"), "filter"),
new JsonInputFormat(null, null),
null
),
null
),
null,
new InputSplit<>("string split")
);
private ObjectMapper mapper;
@Before
public void setup()
{
mapper = new TestUtils().getTestObjectMapper();
}
@Test
public void testNewSubTaskType() throws IOException
{
final SinglePhaseSubTask expected = SPEC.newSubTask(0);
final byte[] json = mapper.writeValueAsBytes(expected);
final Map<String, Object> actual = mapper.readValue(json, Map.class);
Assert.assertEquals(SinglePhaseSubTask.TYPE, actual.get("type"));
}
@Test
public void testNewSubTaskWithBackwardCompatibleType() throws IOException
{
final SinglePhaseSubTask expected = SPEC.newSubTaskWithBackwardCompatibleType(0);
final byte[] json = mapper.writeValueAsBytes(expected);
final Map<String, Object> actual = mapper.readValue(json, Map.class);
Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type"));
}
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Assert;
@ -77,7 +78,7 @@ public class TaskMonitorTest
final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures = IntStream
.range(0, 10)
.mapToObj(i -> monitor.submit(
new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0)
new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 0, false)
))
.collect(Collectors.toList());
for (int i = 0; i < futures.size(); i++) {
@ -98,7 +99,16 @@ public class TaskMonitorTest
final List<TestTaskSpec> specs = IntStream
.range(0, 10)
.mapToObj(
i -> new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, new IntegerInputSplit(i), 100L, 2)
i -> new TestTaskSpec(
"specId" + i,
"groupId",
"supervisorId",
null,
new IntegerInputSplit(i),
100L,
2,
false
)
)
.collect(Collectors.toList());
final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures = specs
@ -127,43 +137,97 @@ public class TaskMonitorTest
}
}
@Test
public void testResubmitWithOldType() throws InterruptedException, ExecutionException, TimeoutException
{
final List<TestTaskSpec> specs = IntStream
.range(0, 10)
.mapToObj(
i -> new TestTaskSpec(
"specId" + i,
"groupId",
"supervisorId",
null,
new IntegerInputSplit(i),
100L,
0,
true
)
)
.collect(Collectors.toList());
final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures = specs
.stream()
.map(monitor::submit)
.collect(Collectors.toList());
for (int i = 0; i < futures.size(); i++) {
// # of threads of taskRunner is 5, and each task is expected to be run 3 times (with 2 retries), so the expected
// max timeout is 6 sec. We additionally wait 4 more seconds here to make sure the test passes.
final SubTaskCompleteEvent<TestTask> result = futures.get(i).get(2, TimeUnit.SECONDS);
Assert.assertEquals("supervisorId", result.getSpec().getSupervisorTaskId());
Assert.assertEquals("specId" + i, result.getSpec().getId());
Assert.assertNotNull(result.getLastStatus());
Assert.assertEquals(TaskState.SUCCESS, result.getLastStatus().getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
final TaskHistory<TestTask> taskHistory = monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId());
Assert.assertNotNull(taskHistory);
final List<TaskStatusPlus> attemptHistory = taskHistory.getAttemptHistory();
Assert.assertNotNull(attemptHistory);
Assert.assertEquals(1, attemptHistory.size());
Assert.assertEquals(TaskState.SUCCESS, attemptHistory.get(0).getStatusCode());
}
}
private static class TestTaskSpec extends SubTaskSpec<TestTask>
{
private final long runTime;
private final int numMaxFails;
private final boolean throwUnknownTypeIdError;
private int numFails;
public TestTaskSpec(
TestTaskSpec(
String id,
String groupId,
String supervisorTaskId,
Map<String, Object> context,
InputSplit inputSplit,
long runTime,
int numMaxFails
int numMaxFails,
boolean throwUnknownTypeIdError
)
{
super(id, groupId, supervisorTaskId, context, inputSplit);
this.runTime = runTime;
this.numMaxFails = numMaxFails;
this.throwUnknownTypeIdError = throwUnknownTypeIdError;
}
@Override
public TestTask newSubTask(int numAttempts)
{
return new TestTask(getId(), runTime, numFails++ < numMaxFails);
return new TestTask(getId(), runTime, numFails++ < numMaxFails, throwUnknownTypeIdError);
}
@Override
public TestTask newSubTaskWithBackwardCompatibleType(int numAttempts)
{
return new TestTask(getId(), runTime, numFails++ < numMaxFails, false);
}
}
private static class TestTask extends NoopTask
{
private final boolean shouldFail;
private final boolean throwUnknownTypeIdError;
TestTask(String id, long runTime, boolean shouldFail)
TestTask(String id, long runTime, boolean shouldFail, boolean throwUnknownTypeIdError)
{
super(id, null, "testDataSource", runTime, 0, null, null, null);
this.shouldFail = shouldFail;
this.throwUnknownTypeIdError = throwUnknownTypeIdError;
}
@Override
@ -185,6 +249,9 @@ public class TaskMonitorTest
{
final TestTask task = (TestTask) taskObject;
tasks.put(task.getId(), TaskState.RUNNING);
if (task.throwUnknownTypeIdError) {
throw new RuntimeException(new ISE("Could not resolve type id 'test_task_id'"));
}
taskRunner.submit(() -> tasks.put(task.getId(), task.run(null).getStatusCode()));
return task.getId();
}
@ -213,7 +280,7 @@ public class TaskMonitorTest
private static class IntegerInputSplit extends InputSplit<Integer>
{
public IntegerInputSplit(int split)
IntegerInputSplit(int split)
{
super(split);
}