mirror of https://github.com/apache/druid.git
Add error msg to parallel task's TaskStatus (#11486)
* Add error msg to parallel task's TaskStatus * Consolidate failure block * Add failure test * Make it fail * Add fail while stopped * Simplify hash task test using a runner that fails after so many runs (parameter) * Remove unthrown exception * Use runner names to identify phase * Added range partition kill test & fixed a timing bug with the custom runner * Forbidden api * Style * Unit test code cleanup * Added message to invalid state exception and improved readability of the phase error messages for the parallel task failure unit tests
This commit is contained in:
parent
cf674c833c
commit
a2da407b70
|
@ -59,14 +59,7 @@ public class TaskStatus
|
|||
|
||||
/**
|
||||
* All failed task status must have a non-null error message.
|
||||
* Use {@link #failure(String, String)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static TaskStatus failure(String taskId)
|
||||
{
|
||||
return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
|
||||
}
|
||||
|
||||
public static TaskStatus failure(String taskId, String errorMsg)
|
||||
{
|
||||
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
|
||||
|
|
|
@ -257,7 +257,7 @@ public class MaterializedViewSupervisorTest
|
|||
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
|
||||
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
|
||||
EasyMock.expect(taskStorage.getStatus("test_task1"))
|
||||
.andReturn(Optional.of(TaskStatus.failure("test_task1")))
|
||||
.andReturn(Optional.of(TaskStatus.failure("test_task1", "Dummy task status failure err message")))
|
||||
.anyTimes();
|
||||
EasyMock.expect(taskStorage.getStatus("test_task2"))
|
||||
.andReturn(Optional.of(TaskStatus.running("test_task2")))
|
||||
|
|
|
@ -170,7 +170,7 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
|
|||
.andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT))
|
||||
.anyTimes();
|
||||
EasyMock.expect(taskInfoProvider.getTaskStatus(TEST_ID))
|
||||
.andReturn(Optional.of(TaskStatus.failure(TEST_ID)))
|
||||
.andReturn(Optional.of(TaskStatus.failure(TEST_ID, "Dummy task status failure err message")))
|
||||
.anyTimes();
|
||||
replayAll();
|
||||
|
||||
|
|
|
@ -1188,7 +1188,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
|
||||
}
|
||||
EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), "Dummy task status failure err message")));
|
||||
EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes();
|
||||
EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true);
|
||||
EasyMock.replay(taskStorage);
|
||||
|
@ -1277,7 +1277,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
.andReturn(ImmutableList.of(captured.getValue()))
|
||||
.anyTimes();
|
||||
EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), "Dummy task status failure err message")));
|
||||
EasyMock.expect(taskStorage.getStatus(runningTaskId))
|
||||
.andReturn(Optional.of(TaskStatus.running(runningTaskId)))
|
||||
.anyTimes();
|
||||
|
|
|
@ -171,7 +171,7 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
|
|||
.andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT))
|
||||
.anyTimes();
|
||||
EasyMock.expect(taskInfoProvider.getTaskStatus(TEST_ID))
|
||||
.andReturn(Optional.of(TaskStatus.failure(TEST_ID)))
|
||||
.andReturn(Optional.of(TaskStatus.failure(TEST_ID, "Dummy task status failure err message")))
|
||||
.anyTimes();
|
||||
replayAll();
|
||||
|
||||
|
|
|
@ -1029,7 +1029,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
|
||||
}
|
||||
EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), "Dummy task status failure err message")));
|
||||
EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes();
|
||||
EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true);
|
||||
EasyMock.replay(taskStorage);
|
||||
|
@ -1147,7 +1147,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
|
||||
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
|
||||
EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId())));
|
||||
.andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), "Dummy task status failure err message")));
|
||||
EasyMock.expect(taskStorage.getStatus(runningTaskId))
|
||||
.andReturn(Optional.of(TaskStatus.running(runningTaskId)))
|
||||
.anyTimes();
|
||||
|
|
|
@ -142,7 +142,8 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
|||
|
||||
synchronized (this) {
|
||||
if (stopped) {
|
||||
return TaskStatus.failure(getId());
|
||||
String errMsg = "Attempting to run a task that has been stopped. See overlord & task logs for more details.";
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
} else {
|
||||
// Register the cleaner to interrupt the current thread first.
|
||||
// Since the resource closer cleans up the registered resources in LIFO order,
|
||||
|
|
|
@ -439,8 +439,12 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
.collect(Collectors.toList());
|
||||
|
||||
if (indexTaskSpecs.isEmpty()) {
|
||||
log.warn("Can't find segments from inputSpec[%s], nothing to do.", ioConfig.getInputSpec());
|
||||
return TaskStatus.failure(getId());
|
||||
String msg = StringUtils.format(
|
||||
"Can't find segments from inputSpec[%s], nothing to do.",
|
||||
ioConfig.getInputSpec()
|
||||
);
|
||||
log.warn(msg);
|
||||
return TaskStatus.failure(getId(), msg);
|
||||
} else {
|
||||
registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
|
||||
final int totalNumSpecs = indexTaskSpecs.size();
|
||||
|
@ -450,8 +454,9 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
|
||||
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
|
||||
if (!currentSubTaskHolder.setTask(eachSpec)) {
|
||||
log.info("Task is asked to stop. Finish as failed.");
|
||||
return TaskStatus.failure(getId());
|
||||
String errMsg = "Task was asked to stop. Finish as failed.";
|
||||
log.info(errMsg);
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
try {
|
||||
if (eachSpec.isReady(toolbox.getTaskActionClient())) {
|
||||
|
@ -472,8 +477,10 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
}
|
||||
}
|
||||
|
||||
log.info("Run [%d] specs, [%d] succeeded, [%d] failed", totalNumSpecs, totalNumSpecs - failCnt, failCnt);
|
||||
return failCnt == 0 ? TaskStatus.success(getId()) : TaskStatus.failure(getId());
|
||||
String msg = StringUtils.format("Ran [%d] specs, [%d] succeeded, [%d] failed",
|
||||
totalNumSpecs, totalNumSpecs - failCnt, failCnt);
|
||||
log.info(msg);
|
||||
return failCnt == 0 ? TaskStatus.success(getId()) : TaskStatus.failure(getId(), msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -405,13 +405,15 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
|
|||
if (specVersion.compareTo(version) < 0) {
|
||||
version = specVersion;
|
||||
} else {
|
||||
log.error(
|
||||
"Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
|
||||
specVersion,
|
||||
version
|
||||
);
|
||||
String errMsg =
|
||||
StringUtils.format(
|
||||
"Spec version can not be greater than or equal to the lock version, Spec version: [%s] Lock version: [%s].",
|
||||
specVersion,
|
||||
version
|
||||
);
|
||||
log.error(errMsg);
|
||||
toolbox.getTaskReportFileWriter().write(getId(), null);
|
||||
return TaskStatus.failure(getId());
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringS
|
|||
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.segment.indexing.TuningConfig;
|
||||
|
@ -123,6 +124,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
|
||||
private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
|
||||
|
||||
private static final String TASK_PHASE_FAILURE_MSG = "Failed in phase[%s]. See task logs for details.";
|
||||
|
||||
private final ParallelIndexIngestionSpec ingestionSchema;
|
||||
/**
|
||||
* Base name for the {@link SubTaskSpec} ID.
|
||||
|
@ -250,7 +253,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
}
|
||||
|
||||
@Nullable
|
||||
private <T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(
|
||||
@VisibleForTesting
|
||||
<T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(
|
||||
TaskToolbox toolbox,
|
||||
Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator
|
||||
)
|
||||
|
@ -555,14 +559,23 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
);
|
||||
|
||||
final TaskState state = runNextPhase(runner);
|
||||
TaskStatus taskStatus;
|
||||
if (state.isSuccess()) {
|
||||
//noinspection ConstantConditions
|
||||
publishSegments(toolbox, runner.getReports());
|
||||
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
|
||||
waitForSegmentAvailability(runner.getReports());
|
||||
}
|
||||
taskStatus = TaskStatus.success(getId());
|
||||
} else {
|
||||
// there is only success or failure after running....
|
||||
Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state);
|
||||
final String errorMessage = StringUtils.format(
|
||||
TASK_PHASE_FAILURE_MSG,
|
||||
runner.getName()
|
||||
);
|
||||
taskStatus = TaskStatus.failure(getId(), errorMessage);
|
||||
}
|
||||
TaskStatus taskStatus = TaskStatus.fromCode(getId(), state);
|
||||
toolbox.getTaskReportFileWriter().write(
|
||||
getId(),
|
||||
getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted)
|
||||
|
@ -608,7 +621,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
}
|
||||
}
|
||||
|
||||
private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
|
||||
@VisibleForTesting
|
||||
TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
TaskState state;
|
||||
ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
|
||||
|
@ -637,7 +651,11 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
|
||||
state = runNextPhase(cardinalityRunner);
|
||||
if (state.isFailure()) {
|
||||
return TaskStatus.failure(getId());
|
||||
String errMsg = StringUtils.format(
|
||||
TASK_PHASE_FAILURE_MSG,
|
||||
cardinalityRunner.getName()
|
||||
);
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
|
||||
if (cardinalityRunner.getReports().isEmpty()) {
|
||||
|
@ -683,7 +701,11 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
|
||||
state = runNextPhase(indexingRunner);
|
||||
if (state.isFailure()) {
|
||||
return TaskStatus.failure(getId());
|
||||
String errMsg = StringUtils.format(
|
||||
TASK_PHASE_FAILURE_MSG,
|
||||
indexingRunner.getName()
|
||||
);
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
|
||||
// 2. Partial segment merge phase
|
||||
|
@ -701,15 +723,24 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec)
|
||||
);
|
||||
state = runNextPhase(mergeRunner);
|
||||
TaskStatus taskStatus;
|
||||
if (state.isSuccess()) {
|
||||
//noinspection ConstantConditions
|
||||
publishSegments(toolbox, mergeRunner.getReports());
|
||||
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
|
||||
waitForSegmentAvailability(mergeRunner.getReports());
|
||||
}
|
||||
taskStatus = TaskStatus.success(getId());
|
||||
} else {
|
||||
// there is only success or failure after running....
|
||||
Preconditions.checkState(state.isFailure(), "Unrecognized state after task is complete[%s]", state);
|
||||
String errMsg = StringUtils.format(
|
||||
TASK_PHASE_FAILURE_MSG,
|
||||
mergeRunner.getName()
|
||||
);
|
||||
taskStatus = TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
|
||||
TaskStatus taskStatus = TaskStatus.fromCode(getId(), state);
|
||||
toolbox.getTaskReportFileWriter().write(
|
||||
getId(),
|
||||
getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted)
|
||||
|
@ -717,7 +748,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
return taskStatus;
|
||||
}
|
||||
|
||||
private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
|
||||
@VisibleForTesting
|
||||
TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
|
||||
ParallelIndexTaskRunner<PartialDimensionDistributionTask, DimensionDistributionReport> distributionRunner =
|
||||
|
@ -728,7 +760,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
|
||||
TaskState distributionState = runNextPhase(distributionRunner);
|
||||
if (distributionState.isFailure()) {
|
||||
return TaskStatus.failure(getId(), PartialDimensionDistributionTask.TYPE + " failed");
|
||||
String errMsg = StringUtils.format(TASK_PHASE_FAILURE_MSG, distributionRunner.getName());
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
|
||||
Map<Interval, PartitionBoundaries> intervalToPartitions =
|
||||
|
@ -755,7 +788,11 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
|
||||
TaskState indexingState = runNextPhase(indexingRunner);
|
||||
if (indexingState.isFailure()) {
|
||||
return TaskStatus.failure(getId(), PartialRangeSegmentGenerateTask.TYPE + " failed");
|
||||
String errMsg = StringUtils.format(
|
||||
TASK_PHASE_FAILURE_MSG,
|
||||
indexingRunner.getName()
|
||||
);
|
||||
return TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
|
||||
// partition (interval, partitionId) -> partition locations
|
||||
|
@ -772,14 +809,23 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, segmentMergeIngestionSpec)
|
||||
);
|
||||
TaskState mergeState = runNextPhase(mergeRunner);
|
||||
TaskStatus taskStatus;
|
||||
if (mergeState.isSuccess()) {
|
||||
publishSegments(toolbox, mergeRunner.getReports());
|
||||
if (awaitSegmentAvailabilityTimeoutMillis > 0) {
|
||||
waitForSegmentAvailability(mergeRunner.getReports());
|
||||
}
|
||||
taskStatus = TaskStatus.success(getId());
|
||||
} else {
|
||||
// there is only success or failure after running....
|
||||
Preconditions.checkState(mergeState.isFailure(), "Unrecognized state after task is complete[%s]", mergeState);
|
||||
String errMsg = StringUtils.format(
|
||||
TASK_PHASE_FAILURE_MSG,
|
||||
mergeRunner.getName()
|
||||
);
|
||||
taskStatus = TaskStatus.failure(getId(), errMsg);
|
||||
}
|
||||
|
||||
TaskStatus taskStatus = TaskStatus.fromCode(getId(), mergeState);
|
||||
toolbox.getTaskReportFileWriter().write(
|
||||
getId(),
|
||||
getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted)
|
||||
|
@ -1046,8 +1092,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
if (currentSubTaskHolder.setTask(indexTask) && indexTask.isReady(toolbox.getTaskActionClient())) {
|
||||
return indexTask.run(toolbox);
|
||||
} else {
|
||||
LOG.info("Task is asked to stop. Finish as failed");
|
||||
return TaskStatus.failure(getId());
|
||||
String msg = "Task was asked to stop. Finish as failed";
|
||||
LOG.info(msg);
|
||||
return TaskStatus.failure(getId(), msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -113,8 +113,7 @@ public class TestTasks
|
|||
while (!Thread.currentThread().isInterrupted()) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
return TaskStatus.failure(getId());
|
||||
return TaskStatus.failure(getId(), "Dummy task status failure for testing");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -173,7 +173,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
|
|||
return getIndexingServiceClient().getPublishedSegments(task);
|
||||
}
|
||||
|
||||
private ParallelIndexSupervisorTask newTask(
|
||||
protected ParallelIndexSupervisorTask newTask(
|
||||
@Nullable TimestampSpec timestampSpec,
|
||||
@Nullable DimensionsSpec dimensionsSpec,
|
||||
@Nullable InputFormat inputFormat,
|
||||
|
|
|
@ -444,7 +444,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
|
|||
if (task.isReady(toolbox.getTaskActionClient())) {
|
||||
return task.run(toolbox);
|
||||
} else {
|
||||
getTaskStorage().setStatus(TaskStatus.failure(task.getId()));
|
||||
getTaskStorage().setStatus(TaskStatus.failure(task.getId(), "Dummy task status failure for testing"));
|
||||
throw new ISE("task[%s] is not ready", task.getId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,461 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.common.task.batch.parallel;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.apache.druid.data.input.impl.CsvInputFormat;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.LocalInputSource;
|
||||
import org.apache.druid.data.input.impl.ParseSpec;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
import org.apache.druid.indexing.common.LockGranularity;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.common.task.TaskResource;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.guava.Comparators;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Force and verify the failure modes for hash partitioning task
|
||||
*/
|
||||
public class HashPartitionTaskKillTest extends AbstractMultiPhaseParallelIndexingTest
|
||||
{
|
||||
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
|
||||
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
|
||||
);
|
||||
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
|
||||
Arrays.asList("ts", "dim1", "dim2", "val"),
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
0
|
||||
);
|
||||
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
|
||||
|
||||
private File inputDir;
|
||||
|
||||
|
||||
public HashPartitionTaskKillTest()
|
||||
{
|
||||
super(LockGranularity.TIME_CHUNK, true, 0, 0);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException
|
||||
{
|
||||
inputDir = temporaryFolder.newFolder("data");
|
||||
final Set<Interval> intervals = new HashSet<>();
|
||||
// set up data
|
||||
for (int i = 0; i < 10; i++) {
|
||||
try (final Writer writer =
|
||||
Files.newBufferedWriter(new File(inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8)) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 1, i + 10, i));
|
||||
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j + 2, i + 11, i));
|
||||
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 1))));
|
||||
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d", j + 2))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
try (final Writer writer =
|
||||
Files.newBufferedWriter(new File(inputDir, "filtered_" + i).toPath(), StandardCharsets.UTF_8)) {
|
||||
writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 1, i + 10, i));
|
||||
}
|
||||
}
|
||||
// sorted input intervals
|
||||
List<Interval> inputIntervals = new ArrayList<>(intervals);
|
||||
inputIntervals.sort(Comparators.intervalsByStartThenEnd());
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void failsInFirstPhase() throws Exception
|
||||
{
|
||||
final ParallelIndexSupervisorTask task =
|
||||
createTestTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, inputDir,
|
||||
"test_*",
|
||||
new HashedPartitionsSpec(null, null, // num shards is null to force it to go to first phase
|
||||
ImmutableList.of("dim1", "dim2")
|
||||
),
|
||||
2, false, true, 0
|
||||
);
|
||||
|
||||
final TaskActionClient actionClient = createActionClient(task);
|
||||
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
task.stopGracefully(null);
|
||||
|
||||
TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
|
||||
|
||||
Assert.assertTrue(taskStatus.isFailure());
|
||||
Assert.assertEquals(
|
||||
"Failed in phase[PHASE-1]. See task logs for details.",
|
||||
taskStatus.getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void failsInSecondPhase() throws Exception
|
||||
{
|
||||
final ParallelIndexSupervisorTask task =
|
||||
createTestTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, null, INTERVAL_TO_INDEX, inputDir,
|
||||
"test_*",
|
||||
new HashedPartitionsSpec(null, 3,
|
||||
ImmutableList.of("dim1", "dim2")
|
||||
),
|
||||
2, false, true, 0
|
||||
);
|
||||
|
||||
final TaskActionClient actionClient = createActionClient(task);
|
||||
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
task.stopGracefully(null);
|
||||
|
||||
TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
|
||||
|
||||
Assert.assertTrue(taskStatus.isFailure());
|
||||
Assert.assertEquals(
|
||||
"Failed in phase[PHASE-2]. See task logs for details.",
|
||||
taskStatus.getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void failsInThirdPhase() throws Exception
|
||||
{
|
||||
final ParallelIndexSupervisorTask task =
|
||||
createTestTask(TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT,
|
||||
null,
|
||||
INTERVAL_TO_INDEX,
|
||||
inputDir,
|
||||
"test_*",
|
||||
new HashedPartitionsSpec(null, 3,
|
||||
ImmutableList.of("dim1", "dim2")
|
||||
),
|
||||
2,
|
||||
false,
|
||||
true,
|
||||
1
|
||||
);
|
||||
|
||||
final TaskActionClient actionClient = createActionClient(task);
|
||||
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
task.stopGracefully(null);
|
||||
|
||||
|
||||
TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);
|
||||
|
||||
Assert.assertTrue(taskStatus.isFailure());
|
||||
Assert.assertEquals(
|
||||
"Failed in phase[PHASE-3]. See task logs for details.",
|
||||
taskStatus.getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
private ParallelIndexSupervisorTask createTestTask(
|
||||
@Nullable TimestampSpec timestampSpec,
|
||||
@Nullable DimensionsSpec dimensionsSpec,
|
||||
@Nullable InputFormat inputFormat,
|
||||
@Nullable ParseSpec parseSpec,
|
||||
Interval interval,
|
||||
File inputDir,
|
||||
String filter,
|
||||
PartitionsSpec partitionsSpec,
|
||||
int maxNumConcurrentSubTasks,
|
||||
boolean appendToExisting,
|
||||
boolean useInputFormatApi,
|
||||
int succeedsBeforeFailing
|
||||
)
|
||||
{
|
||||
GranularitySpec granularitySpec = new UniformGranularitySpec(
|
||||
SEGMENT_GRANULARITY,
|
||||
Granularities.MINUTE,
|
||||
interval == null ? null : Collections.singletonList(interval)
|
||||
);
|
||||
|
||||
ParallelIndexTuningConfig tuningConfig = newTuningConfig(
|
||||
partitionsSpec,
|
||||
maxNumConcurrentSubTasks,
|
||||
!appendToExisting
|
||||
);
|
||||
|
||||
final ParallelIndexIngestionSpec ingestionSpec;
|
||||
|
||||
if (useInputFormatApi) {
|
||||
Preconditions.checkArgument(parseSpec == null);
|
||||
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
|
||||
null,
|
||||
new LocalInputSource(inputDir, filter),
|
||||
inputFormat,
|
||||
appendToExisting,
|
||||
null
|
||||
);
|
||||
ingestionSpec = new ParallelIndexIngestionSpec(
|
||||
new DataSchema(
|
||||
DATASOURCE,
|
||||
timestampSpec,
|
||||
dimensionsSpec,
|
||||
new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")},
|
||||
granularitySpec,
|
||||
null
|
||||
),
|
||||
ioConfig,
|
||||
tuningConfig
|
||||
);
|
||||
} else {
|
||||
Preconditions.checkArgument(inputFormat == null);
|
||||
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
|
||||
new LocalFirehoseFactory(inputDir, filter, null),
|
||||
appendToExisting
|
||||
);
|
||||
//noinspection unchecked
|
||||
ingestionSpec = new ParallelIndexIngestionSpec(
|
||||
new DataSchema(
|
||||
"dataSource",
|
||||
getObjectMapper().convertValue(
|
||||
new StringInputRowParser(parseSpec, null),
|
||||
Map.class
|
||||
),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("val", "val")
|
||||
},
|
||||
granularitySpec,
|
||||
null,
|
||||
getObjectMapper()
|
||||
),
|
||||
ioConfig,
|
||||
tuningConfig
|
||||
);
|
||||
}
|
||||
|
||||
return new ParallelIndexSupervisorTaskTest(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
ingestionSpec,
|
||||
null,
|
||||
Collections.emptyMap(),
|
||||
succeedsBeforeFailing
|
||||
);
|
||||
}
|
||||
|
||||
static class ParallelIndexSupervisorTaskTest extends ParallelIndexSupervisorTask
|
||||
{
|
||||
private final int succeedsBeforeFailing;
|
||||
private int numRuns = 0;
|
||||
|
||||
public ParallelIndexSupervisorTaskTest(
|
||||
String id,
|
||||
@Nullable String groupId,
|
||||
TaskResource taskResource,
|
||||
ParallelIndexIngestionSpec ingestionSchema,
|
||||
@Nullable String baseSubtaskSpecName,
|
||||
Map<String, Object> context,
|
||||
int succedsBeforeFailing
|
||||
)
|
||||
{
|
||||
super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, context);
|
||||
this.succeedsBeforeFailing = succedsBeforeFailing;
|
||||
}
|
||||
|
||||
@Override
|
||||
<T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(
|
||||
TaskToolbox toolbox,
|
||||
Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator
|
||||
)
|
||||
{
|
||||
|
||||
// for the hash partition task it is kind of hacky to figure out what is the failure phase
|
||||
// basically we force the failure in first phase by having numShards being null (this is
|
||||
// determined by the implementation of the run method -- which may change and suddenly this test
|
||||
// will break requiring messing with the logic below).
|
||||
// For the other two subsequent failures we need to have numShards non-null, so it bypasses
|
||||
// the first failure, so the conditions for failure in the different phase are given below:
|
||||
ParallelIndexTaskRunner<T, R> retVal;
|
||||
if (succeedsBeforeFailing == 0
|
||||
&& this.getIngestionSchema().getTuningConfig().getNumShards() == null) {
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(false, "PHASE-1");
|
||||
} else if (succeedsBeforeFailing == 0
|
||||
&& this.getIngestionSchema().getTuningConfig().getNumShards() != null) {
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(false, "PHASE-2");
|
||||
} else if (succeedsBeforeFailing == 1
|
||||
&& numRuns == 1
|
||||
&& this.getIngestionSchema().getTuningConfig().getNumShards() != null) {
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(false, "PHASE-3");
|
||||
} else {
|
||||
numRuns++;
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(true, "SUCCESFUL-PHASE");
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
static class TestRunner
|
||||
implements ParallelIndexTaskRunner<PartialDimensionCardinalityTask, DimensionCardinalityReport>
|
||||
{
|
||||
|
||||
// These variables are at the class level since they are used to controlling after how many invocations of
|
||||
// run the runner should fail
|
||||
private final boolean succeeds;
|
||||
private final String phase;
|
||||
|
||||
TestRunner(boolean succeeds, String phase)
|
||||
{
|
||||
this.succeeds = succeeds;
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
if (succeeds) {
|
||||
return StringUtils.format(phase);
|
||||
} else {
|
||||
return StringUtils.format(phase);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskState run()
|
||||
{
|
||||
if (succeeds) {
|
||||
return TaskState.SUCCESS;
|
||||
}
|
||||
return TaskState.FAILED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectReport(DimensionCardinalityReport report)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DimensionCardinalityReport> getReports()
|
||||
{
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParallelIndexingPhaseProgress getProgress()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRunningTaskIds()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SubTaskSpec<PartialDimensionCardinalityTask>> getSubTaskSpecs()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SubTaskSpec<PartialDimensionCardinalityTask>> getRunningSubTaskSpecs()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SubTaskSpec<PartialDimensionCardinalityTask>> getCompleteSubTaskSpecs()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public SubTaskSpec<PartialDimensionCardinalityTask> getSubTaskSpec(String subTaskSpecId)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public SubTaskSpecStatus getSubTaskState(String subTaskSpecId)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public TaskHistory<PartialDimensionCardinalityTask> getCompleteSubTaskSpecAttemptHistory(String subTaskSpecId)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -126,8 +126,10 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
|
|||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
|
||||
final TaskState state = task.run(toolbox).getStatusCode();
|
||||
Assert.assertEquals(TaskState.FAILED, state);
|
||||
final TaskStatus taskStatus = task.run(toolbox);
|
||||
Assert.assertEquals("Failed in phase[segment generation]. See task logs for details.",
|
||||
taskStatus.getErrorMsg());
|
||||
Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode());
|
||||
|
||||
final SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner();
|
||||
Assert.assertTrue(runner.getRunningTaskIds().isEmpty());
|
||||
|
|
|
@ -0,0 +1,457 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.common.task.batch.parallel;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.data.input.InputFormat;
|
||||
import org.apache.druid.data.input.impl.CsvInputFormat;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.LocalInputSource;
|
||||
import org.apache.druid.data.input.impl.ParseSpec;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.indexer.TaskState;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexer.partitions.PartitionsSpec;
|
||||
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
|
||||
import org.apache.druid.indexing.common.LockGranularity;
|
||||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.common.task.TaskResource;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Force and verify the failure modes for range partitioning task
|
||||
*/
|
||||
public class RangePartitionTaskKillTest extends AbstractMultiPhaseParallelIndexingTest
|
||||
{
|
||||
private static final int NUM_PARTITION = 2;
|
||||
private static final int NUM_ROW = 20;
|
||||
private static final int DIM_FILE_CARDINALITY = 2;
|
||||
private static final int YEAR = 2017;
|
||||
private static final Interval INTERVAL_TO_INDEX = Intervals.of("%s-12/P1M", YEAR);
|
||||
private static final String TIME = "ts";
|
||||
private static final String DIM1 = "dim1";
|
||||
private static final String DIM2 = "dim2";
|
||||
private static final String LIST_DELIMITER = "|";
|
||||
private static final String TEST_FILE_NAME_PREFIX = "test_";
|
||||
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null);
|
||||
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))
|
||||
);
|
||||
|
||||
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
|
||||
Arrays.asList(TIME, DIM1, DIM2, "val"),
|
||||
LIST_DELIMITER,
|
||||
false,
|
||||
false,
|
||||
0
|
||||
);
|
||||
|
||||
private File inputDir;
|
||||
|
||||
public RangePartitionTaskKillTest()
|
||||
{
|
||||
super(LockGranularity.SEGMENT, true, DEFAULT_TRANSIENT_TASK_FAILURE_RATE, DEFAULT_TRANSIENT_API_FAILURE_RATE);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException
|
||||
{
|
||||
inputDir = temporaryFolder.newFolder("data");
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void failsFirstPhase() throws Exception
|
||||
{
|
||||
int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
|
||||
final ParallelIndexSupervisorTask task =
|
||||
newTask(TIMESTAMP_SPEC,
|
||||
DIMENSIONS_SPEC,
|
||||
INPUT_FORMAT,
|
||||
null,
|
||||
INTERVAL_TO_INDEX,
|
||||
inputDir,
|
||||
TEST_FILE_NAME_PREFIX + "*",
|
||||
new SingleDimensionPartitionsSpec(
|
||||
targetRowsPerSegment,
|
||||
null,
|
||||
DIM1,
|
||||
false
|
||||
),
|
||||
2,
|
||||
false,
|
||||
0
|
||||
);
|
||||
|
||||
final TaskActionClient actionClient = createActionClient(task);
|
||||
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
task.stopGracefully(null);
|
||||
|
||||
|
||||
TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);
|
||||
|
||||
Assert.assertTrue(taskStatus.isFailure());
|
||||
Assert.assertEquals(
|
||||
"Failed in phase[PHASE-1]. See task logs for details.",
|
||||
taskStatus.getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void failsSecondPhase() throws Exception
|
||||
{
|
||||
int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
|
||||
final ParallelIndexSupervisorTask task =
|
||||
newTask(TIMESTAMP_SPEC,
|
||||
DIMENSIONS_SPEC,
|
||||
INPUT_FORMAT,
|
||||
null,
|
||||
INTERVAL_TO_INDEX,
|
||||
inputDir,
|
||||
TEST_FILE_NAME_PREFIX + "*",
|
||||
new SingleDimensionPartitionsSpec(
|
||||
targetRowsPerSegment,
|
||||
null,
|
||||
DIM1,
|
||||
false
|
||||
),
|
||||
2,
|
||||
false,
|
||||
1
|
||||
);
|
||||
|
||||
final TaskActionClient actionClient = createActionClient(task);
|
||||
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
task.stopGracefully(null);
|
||||
|
||||
|
||||
TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);
|
||||
|
||||
Assert.assertTrue(taskStatus.isFailure());
|
||||
Assert.assertEquals(
|
||||
"Failed in phase[PHASE-2]. See task logs for details.",
|
||||
taskStatus.getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void failsThirdPhase() throws Exception
|
||||
{
|
||||
int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
|
||||
final ParallelIndexSupervisorTask task =
|
||||
newTask(TIMESTAMP_SPEC,
|
||||
DIMENSIONS_SPEC,
|
||||
INPUT_FORMAT,
|
||||
null,
|
||||
INTERVAL_TO_INDEX,
|
||||
inputDir,
|
||||
TEST_FILE_NAME_PREFIX + "*",
|
||||
new SingleDimensionPartitionsSpec(
|
||||
targetRowsPerSegment,
|
||||
null,
|
||||
DIM1,
|
||||
false
|
||||
),
|
||||
2,
|
||||
false,
|
||||
2
|
||||
);
|
||||
|
||||
final TaskActionClient actionClient = createActionClient(task);
|
||||
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
task.stopGracefully(null);
|
||||
|
||||
|
||||
TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);
|
||||
|
||||
Assert.assertTrue(taskStatus.isFailure());
|
||||
Assert.assertEquals(
|
||||
"Failed in phase[PHASE-3]. See task logs for details.",
|
||||
taskStatus.getErrorMsg()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
static class ParallelIndexSupervisorTaskTest extends ParallelIndexSupervisorTask
|
||||
{
|
||||
// These variables control how many runners get created until it fails:
|
||||
private final int succeedsBeforeFailing;
|
||||
private int numRuns;
|
||||
|
||||
// These maps are a hacky way to provide some sort of mock object in the runner to make the run continue
|
||||
// until it fails (whatever they contain is nonsense other that it allows the code to make progress):
|
||||
private final Map<String, DimensionDistributionReport> firstMap;
|
||||
private final Map<String, DimensionDistributionReport> secondMap;
|
||||
|
||||
public ParallelIndexSupervisorTaskTest(
|
||||
String id,
|
||||
@Nullable String groupId,
|
||||
TaskResource taskResource,
|
||||
ParallelIndexIngestionSpec ingestionSchema,
|
||||
@Nullable String baseSubtaskSpecName,
|
||||
Map<String, Object> context,
|
||||
int succedsBeforeFailing
|
||||
|
||||
)
|
||||
{
|
||||
super(id, groupId, taskResource, ingestionSchema, baseSubtaskSpecName, context);
|
||||
this.succeedsBeforeFailing = succedsBeforeFailing;
|
||||
|
||||
this.firstMap = new HashMap<>();
|
||||
Map<Interval, StringDistribution> intervalToDistribution = new HashMap<>();
|
||||
intervalToDistribution.put(Intervals.of("2011-04-01/2011-04-02"), new StringSketch());
|
||||
this.firstMap.put("A", new DimensionDistributionReport("id", intervalToDistribution));
|
||||
|
||||
this.secondMap = Collections.emptyMap();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
<T extends Task, R extends SubTaskReport> ParallelIndexTaskRunner<T, R> createRunner(
|
||||
TaskToolbox toolbox,
|
||||
Function<TaskToolbox, ParallelIndexTaskRunner<T, R>> runnerCreator
|
||||
)
|
||||
{
|
||||
|
||||
// Below are the conditions to determine phase:
|
||||
ParallelIndexTaskRunner<T, R> retVal;
|
||||
if (succeedsBeforeFailing == 0) {
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(false, firstMap, "PHASE-1");
|
||||
} else if (succeedsBeforeFailing == 1
|
||||
&& numRuns == 1) {
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(false, secondMap, "PHASE-2");
|
||||
} else if (succeedsBeforeFailing == 2
|
||||
&& numRuns == 2) {
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(false, secondMap, "PHASE-3");
|
||||
} else {
|
||||
numRuns++;
|
||||
Map<String, DimensionDistributionReport> map;
|
||||
if (numRuns < 2) {
|
||||
map = firstMap;
|
||||
} else {
|
||||
map = secondMap;
|
||||
}
|
||||
retVal = (ParallelIndexTaskRunner<T, R>) new TestRunner(true, map, "SUCCESFUL-PHASE");
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
static class TestRunner
|
||||
implements ParallelIndexTaskRunner<PartialDimensionDistributionTask, DimensionDistributionReport>
|
||||
{
|
||||
|
||||
private final boolean succeeds;
|
||||
private final String phase;
|
||||
|
||||
private final Map<String, DimensionDistributionReport> distributionMap;
|
||||
|
||||
TestRunner(boolean succeeds, Map<String, DimensionDistributionReport> distributionMap, String phase)
|
||||
{
|
||||
this.succeeds = succeeds;
|
||||
this.distributionMap = distributionMap;
|
||||
this.phase = phase;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
if (succeeds) {
|
||||
return StringUtils.format(phase);
|
||||
} else {
|
||||
return StringUtils.format(phase);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskState run()
|
||||
{
|
||||
if (succeeds) {
|
||||
return TaskState.SUCCESS;
|
||||
}
|
||||
return TaskState.FAILED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopGracefully()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectReport(DimensionDistributionReport report)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DimensionDistributionReport> getReports()
|
||||
{
|
||||
return distributionMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ParallelIndexingPhaseProgress getProgress()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRunningTaskIds()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SubTaskSpec<PartialDimensionDistributionTask>> getSubTaskSpecs()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SubTaskSpec<PartialDimensionDistributionTask>> getRunningSubTaskSpecs()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SubTaskSpec<PartialDimensionDistributionTask>> getCompleteSubTaskSpecs()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public SubTaskSpec<PartialDimensionDistributionTask> getSubTaskSpec(String subTaskSpecId)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public SubTaskSpecStatus getSubTaskState(String subTaskSpecId)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public TaskHistory<PartialDimensionDistributionTask> getCompleteSubTaskSpecAttemptHistory(String subTaskSpecId)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected ParallelIndexSupervisorTask newTask(
|
||||
@Nullable TimestampSpec timestampSpec,
|
||||
@Nullable DimensionsSpec dimensionsSpec,
|
||||
@Nullable InputFormat inputFormat,
|
||||
@Nullable ParseSpec parseSpec,
|
||||
Interval interval,
|
||||
File inputDir,
|
||||
String filter,
|
||||
PartitionsSpec partitionsSpec,
|
||||
int maxNumConcurrentSubTasks,
|
||||
boolean appendToExisting,
|
||||
int succeedsBeforeFailing
|
||||
)
|
||||
{
|
||||
GranularitySpec granularitySpec = new UniformGranularitySpec(
|
||||
SEGMENT_GRANULARITY,
|
||||
Granularities.MINUTE,
|
||||
interval == null ? null : Collections.singletonList(interval)
|
||||
);
|
||||
|
||||
ParallelIndexTuningConfig tuningConfig = newTuningConfig(
|
||||
partitionsSpec,
|
||||
maxNumConcurrentSubTasks,
|
||||
!appendToExisting
|
||||
);
|
||||
|
||||
final ParallelIndexIngestionSpec ingestionSpec;
|
||||
|
||||
Preconditions.checkArgument(parseSpec == null);
|
||||
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
|
||||
null,
|
||||
new LocalInputSource(inputDir, filter),
|
||||
inputFormat,
|
||||
appendToExisting,
|
||||
null
|
||||
);
|
||||
ingestionSpec = new ParallelIndexIngestionSpec(
|
||||
new DataSchema(
|
||||
DATASOURCE,
|
||||
timestampSpec,
|
||||
dimensionsSpec,
|
||||
new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")},
|
||||
granularitySpec,
|
||||
null
|
||||
),
|
||||
ioConfig,
|
||||
tuningConfig
|
||||
);
|
||||
|
||||
return new ParallelIndexSupervisorTaskTest(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
ingestionSpec,
|
||||
null,
|
||||
Collections.emptyMap(),
|
||||
succeedsBeforeFailing
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -240,7 +240,7 @@ public class TaskMonitorTest
|
|||
monitor.collectReport(new SimpleSubTaskReport(getId()));
|
||||
if (shouldFail) {
|
||||
Thread.sleep(getRunTime());
|
||||
return TaskStatus.failure(getId());
|
||||
return TaskStatus.failure(getId(), "Dummy task status failure for testing");
|
||||
} else {
|
||||
return super.run(toolbox);
|
||||
}
|
||||
|
|
|
@ -200,8 +200,16 @@ public class RemoteTaskRunnerTestUtils
|
|||
|
||||
void mockWorkerCompleteFailedTask(final String workerId, final Task task) throws Exception
|
||||
{
|
||||
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(task, TaskStatus.failure(task.getId()), DUMMY_LOCATION);
|
||||
cf.setData().forPath(JOINER.join(STATUS_PATH, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
|
||||
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(
|
||||
task,
|
||||
TaskStatus.failure(
|
||||
task.getId(),
|
||||
"Dummy task status failure for testing"
|
||||
),
|
||||
DUMMY_LOCATION
|
||||
);
|
||||
cf.setData()
|
||||
.forPath(JOINER.join(STATUS_PATH, workerId, task.getId()), jsonMapper.writeValueAsBytes(taskAnnouncement));
|
||||
}
|
||||
|
||||
boolean workerRunningTask(final String workerId, final String taskId)
|
||||
|
|
|
@ -1380,7 +1380,7 @@ public class TaskLockboxTest
|
|||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox)
|
||||
{
|
||||
return TaskStatus.failure("how?");
|
||||
return TaskStatus.failure("how?", "Dummy task status err msg");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -176,11 +176,15 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
|
|||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), taskStatus);
|
||||
}
|
||||
catch (Exception e) {
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
|
||||
String errMsg = "Graceful shutdown of task aborted with exception, see task logs for more information";
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId(), errMsg));
|
||||
throw new RE(e, "Graceful shutdown of task[%s] aborted with exception", task.getId());
|
||||
}
|
||||
} else {
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(task.getId()));
|
||||
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.failure(
|
||||
task.getId(),
|
||||
"Task failure while shutting down gracefully"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -416,11 +420,11 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
|
|||
log.warn(e, "Interrupted while running task[%s]", task);
|
||||
}
|
||||
|
||||
status = TaskStatus.failure(task.getId());
|
||||
status = TaskStatus.failure(task.getId(), "Task failed due to its thread being interrupted");
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception while running task[%s]", task);
|
||||
status = TaskStatus.failure(task.getId());
|
||||
status = TaskStatus.failure(task.getId(), "Task failed");
|
||||
}
|
||||
catch (Throwable t) {
|
||||
throw new RE(t, "Uncaught Throwable while running task[%s]", task);
|
||||
|
|
|
@ -1084,7 +1084,7 @@ public class HttpRemoteTaskRunnerTest
|
|||
EasyMock.replay(rogueWorkerHolder);
|
||||
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
|
||||
task,
|
||||
TaskStatus.failure(task.getId()),
|
||||
TaskStatus.failure(task.getId(), "Dummy task status failure err message"),
|
||||
TaskLocation.create("rogue-worker", 1, 2)
|
||||
), rogueWorkerHolder);
|
||||
Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getRunningTasks()).getTaskId());
|
||||
|
@ -1122,7 +1122,7 @@ public class HttpRemoteTaskRunnerTest
|
|||
EasyMock.replay(rogueWorkerHolder);
|
||||
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
|
||||
task,
|
||||
TaskStatus.failure(task.getId()),
|
||||
TaskStatus.failure(task.getId(), "Dummy task status failure for testing"),
|
||||
TaskLocation.create("rogue-worker", 1, 2)
|
||||
), rogueWorkerHolder);
|
||||
Assert.assertEquals(task.getId(), Iterables.getOnlyElement(taskRunner.getCompletedTasks()).getTaskId());
|
||||
|
|
Loading…
Reference in New Issue