Inject things instead of subclassing everything for parallel task testing (#9353)

* Inject things instead of subclassing everything for parallel task
testing

* javadoc

* fix compilation

* fix wrong merge

* Address comments
This commit is contained in:
Jihoon Son 2020-02-16 13:00:12 -08:00 committed by GitHub
parent b1be88d79c
commit 3bb9e7e53a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 997 additions and 1158 deletions

View File

@ -76,7 +76,8 @@ public abstract class AbstractTask implements Task
this.groupId = groupId == null ? id : groupId;
this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.context = context == null ? new HashMap<>() : context;
// Copy the given context into a new mutable map because the Druid indexing service can add some internal contexts.
this.context = context == null ? new HashMap<>() : new HashMap<>(context);
}
public static String getOrMakeId(String id, final String typeName, String dataSource)

View File

@ -287,6 +287,12 @@ public class CompactionTask extends AbstractBatchIndexTask
}
}
@VisibleForTesting
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
return currentSubTaskHolder;
}
@JsonProperty
public CompactionIOConfig getIoConfig()
{

View File

@ -26,13 +26,11 @@ import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
/**
* This class represents the intermediary data server where the partition of {@link #interval} and {@link #shardSpec}
* This class represents the intermediary data server where the partition of {@code interval} and {@code shardSpec}
* is stored.
*/
public class GenericPartitionLocation extends PartitionLocation<ShardSpec>
{
private final ShardSpec shardSpec;
@JsonCreator
public GenericPartitionLocation(
@JsonProperty("host") String host,
@ -44,19 +42,18 @@ public class GenericPartitionLocation extends PartitionLocation<ShardSpec>
)
{
super(host, port, useHttps, subTaskId, interval, shardSpec);
this.shardSpec = shardSpec;
}
@JsonIgnore
@Override
public int getPartitionId()
{
return shardSpec.getPartitionNum();
return getSecondaryPartition().getPartitionNum();
}
@JsonProperty
ShardSpec getShardSpec()
{
return shardSpec;
return getSecondaryPartition();
}
}

View File

@ -24,13 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Interval;
/**
* This class represents the intermediary data server where the partition of {@link #interval} and {@link #partitionId}
* This class represents the intermediary data server where the partition of {@code interval} and {@code partitionId}
* is stored.
*/
public class HashPartitionLocation extends PartitionLocation<Integer>
{
private final int partitionId;
@JsonCreator
public HashPartitionLocation(
@JsonProperty("host") String host,
@ -42,13 +40,12 @@ public class HashPartitionLocation extends PartitionLocation<Integer>
)
{
super(host, port, useHttps, subTaskId, interval, partitionId);
this.partitionId = partitionId;
}
@JsonProperty
@Override
public int getPartitionId()
{
return partitionId;
return getSecondaryPartition();
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.jboss.netty.handler.codec.http.HttpMethod;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutionException;
/**
* HTTP-based ShuffleClient.
* This class is injected as a lazy singleton instance and thus must be stateless.
*/
public class HttpShuffleClient implements ShuffleClient
{
@VisibleForTesting
static final int NUM_FETCH_RETRIES = 3;
private static final int BUFFER_SIZE = 1024 * 4;
private final HttpClient httpClient;
@Inject
public HttpShuffleClient(@EscalatedClient HttpClient httpClient)
{
this.httpClient = httpClient;
}
@Override
public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
File partitionDir,
String supervisorTaskId,
P location
) throws IOException
{
// Create a local buffer since this class is not thread-safe.
// Note that this method can be called by different threads at the same time with ThreadingTaskRunner.
final byte[] buffer = new byte[BUFFER_SIZE];
final File zippedFile = new File(partitionDir, StringUtils.format("temp_%s", location.getSubTaskId()));
final URI uri = location.toIntermediaryDataServerURI(supervisorTaskId);
FileUtils.copyLarge(
uri,
u -> {
try {
return httpClient.go(new Request(HttpMethod.GET, u.toURL()), new InputStreamResponseHandler())
.get();
}
catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
},
zippedFile,
buffer,
t -> t instanceof IOException,
NUM_FETCH_RETRIES,
StringUtils.format("Failed to fetch file[%s]", uri)
);
return zippedFile;
}
}

View File

@ -110,8 +110,7 @@ abstract class InputSourceSplitParallelIndexTaskRunner<T extends Task, R extends
getTaskId(),
getContext(),
split,
subTaskIngestionSpec,
getIndexingServiceClient()
subTaskIngestionSpec
);
}
@ -124,7 +123,6 @@ abstract class InputSourceSplitParallelIndexTaskRunner<T extends Task, R extends
String supervisorTaskId,
Map<String, Object> context,
InputSplit split,
ParallelIndexIngestionSpec subTaskIngestionSpec,
IndexingServiceClient indexingServiceClient
ParallelIndexIngestionSpec subTaskIngestionSpec
);
}

View File

@ -94,8 +94,7 @@ class PartialDimensionDistributionParallelIndexTaskRunner
String supervisorTaskId,
Map<String, Object> context,
InputSplit split,
ParallelIndexIngestionSpec subTaskIngestionSpec,
IndexingServiceClient indexingServiceClient
ParallelIndexIngestionSpec subTaskIngestionSpec
)
{
return new SubTaskSpec<PartialDimensionDistributionTask>(

View File

@ -26,11 +26,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
@ -60,7 +58,7 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Shar
@JsonProperty("context") final Map<String, Object> context,
@JacksonInject IndexingServiceClient indexingServiceClient,
@JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
@JacksonInject @EscalatedClient HttpClient shuffleClient
@JacksonInject ShuffleClient shuffleClient
)
{
super(

View File

@ -19,12 +19,9 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import java.util.Map;
@ -38,10 +35,6 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
{
private static final String PHASE_NAME = "partial segment generation";
// For tests
private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory;
private final AppenderatorsManager appenderatorsManager;
PartialHashSegmentGenerateParallelIndexTaskRunner(
TaskToolbox toolbox,
String taskId,
@ -50,25 +43,8 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
)
{
this(toolbox, taskId, groupId, ingestionSchema, context, indexingServiceClient, null, null);
}
@VisibleForTesting
PartialHashSegmentGenerateParallelIndexTaskRunner(
TaskToolbox toolbox,
String taskId,
String groupId,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient,
IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
AppenderatorsManager appenderatorsManager
)
{
super(toolbox, taskId, groupId, ingestionSchema, context, indexingServiceClient);
this.taskClientFactory = taskClientFactory;
this.appenderatorsManager = appenderatorsManager;
}
@Override
@ -84,8 +60,7 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
String supervisorTaskId,
Map<String, Object> context,
InputSplit split,
ParallelIndexIngestionSpec subTaskIngestionSpec,
IndexingServiceClient indexingServiceClient
ParallelIndexIngestionSpec subTaskIngestionSpec
)
{
return new SubTaskSpec<PartialHashSegmentGenerateTask>(
@ -107,9 +82,9 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
numAttempts,
subTaskIngestionSpec,
context,
indexingServiceClient,
taskClientFactory,
appenderatorsManager
null,
null,
null
);
}
};

View File

@ -24,13 +24,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.joda.time.Interval;
@ -63,7 +61,7 @@ public class PartialHashSegmentMergeTask
@JsonProperty("context") final Map<String, Object> context,
@JacksonInject IndexingServiceClient indexingServiceClient,
@JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
@JacksonInject @EscalatedClient HttpClient shuffleClient
@JacksonInject ShuffleClient shuffleClient
)
{
super(

View File

@ -19,12 +19,9 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.joda.time.Interval;
@ -40,8 +37,6 @@ class PartialRangeSegmentGenerateParallelIndexTaskRunner
{
private static final String PHASE_NAME = "partial segment generation";
private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory;
private final AppenderatorsManager appenderatorsManager;
private final Map<Interval, PartitionBoundaries> intervalToPartitions;
PartialRangeSegmentGenerateParallelIndexTaskRunner(
@ -53,36 +48,8 @@ class PartialRangeSegmentGenerateParallelIndexTaskRunner
IndexingServiceClient indexingServiceClient,
Map<Interval, PartitionBoundaries> intervalToPartitions
)
{
this(
toolbox,
taskId,
groupId,
ingestionSchema,
context,
indexingServiceClient,
intervalToPartitions,
null,
null
);
}
@VisibleForTesting
PartialRangeSegmentGenerateParallelIndexTaskRunner(
TaskToolbox toolbox,
String taskId,
String groupId,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient,
Map<Interval, PartitionBoundaries> intervalToPartitions,
IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
AppenderatorsManager appenderatorsManager
)
{
super(toolbox, taskId, groupId, ingestionSchema, context, indexingServiceClient);
this.taskClientFactory = taskClientFactory;
this.appenderatorsManager = appenderatorsManager;
this.intervalToPartitions = intervalToPartitions;
}
@ -99,8 +66,7 @@ class PartialRangeSegmentGenerateParallelIndexTaskRunner
String supervisorTaskId,
Map<String, Object> context,
InputSplit split,
ParallelIndexIngestionSpec subTaskIngestionSpec,
IndexingServiceClient indexingServiceClient
ParallelIndexIngestionSpec subTaskIngestionSpec
)
{
return new SubTaskSpec<PartialRangeSegmentGenerateTask>(
@ -123,9 +89,9 @@ class PartialRangeSegmentGenerateParallelIndexTaskRunner
subTaskIngestionSpec,
context,
intervalToPartitions,
indexingServiceClient,
taskClientFactory,
appenderatorsManager
null,
null,
null
);
}
};

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
@ -43,9 +42,6 @@ import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
@ -56,13 +52,11 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -72,7 +66,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -82,16 +75,14 @@ import java.util.stream.Collectors;
abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionLocation> extends PerfectRollupWorkerTask
{
private static final Logger LOG = new Logger(PartialSegmentMergeTask.class);
private static final int BUFFER_SIZE = 1024 * 4;
private static final int NUM_FETCH_RETRIES = 3;
private final PartialSegmentMergeIOConfig<P> ioConfig;
private final int numAttempts;
private final String supervisorTaskId;
private final IndexingServiceClient indexingServiceClient;
private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory;
private final HttpClient shuffleClient;
private final byte[] buffer;
private final ShuffleClient shuffleClient;
PartialSegmentMergeTask(
// id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@ -106,7 +97,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
final Map<String, Object> context,
IndexingServiceClient indexingServiceClient,
IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
HttpClient shuffleClient
ShuffleClient shuffleClient
)
{
super(
@ -124,7 +115,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
this.indexingServiceClient = indexingServiceClient;
this.taskClientFactory = taskClientFactory;
this.shuffleClient = shuffleClient;
this.buffer = new byte[BUFFER_SIZE];
}
@JsonProperty
@ -234,7 +224,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
);
FileUtils.forceMkdir(partitionDir);
for (P location : entryPerPartitionId.getValue()) {
final File zippedFile = fetchSegmentFile(partitionDir, location);
final File zippedFile = shuffleClient.fetchSegmentFile(partitionDir, supervisorTaskId, location);
try {
final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
FileUtils.forceMkdir(unzippedDir);
@ -254,31 +244,6 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
return intervalToUnzippedFiles;
}
@VisibleForTesting
File fetchSegmentFile(File partitionDir, P location) throws IOException
{
final File zippedFile = new File(partitionDir, StringUtils.format("temp_%s", location.getSubTaskId()));
final URI uri = location.toIntermediaryDataServerURI(supervisorTaskId);
org.apache.druid.java.util.common.FileUtils.copyLarge(
uri,
u -> {
try {
return shuffleClient.go(new Request(HttpMethod.GET, u.toURL()), new InputStreamResponseHandler())
.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
},
zippedFile,
buffer,
t -> t instanceof IOException,
NUM_FETCH_RETRIES,
StringUtils.format("Failed to fetch file[%s]", uri)
);
return zippedFile;
}
/**
* Create a {@link ShardSpec} suitable for the desired secondary partitioning strategy.
*/

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.Interval;
@ -86,6 +87,12 @@ abstract class PartitionLocation<T>
return interval;
}
@JsonIgnore
public T getSecondaryPartition()
{
return secondaryPartition;
}
abstract int getPartitionId();
final URI toIntermediaryDataServerURI(String supervisorTaskId)

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import java.io.File;
import java.io.IOException;
/**
* An interface for intermediate data shuffle during the parallel indexing.
* The only available implementation for production code is {@link HttpShuffleClient} and
* this interface is more for easier testing.
*
* @see org.apache.druid.indexing.worker.IntermediaryDataManager
* @see PartialSegmentMergeTask
*/
public interface ShuffleClient
{
/**
* Fetch the segment file into the local storage for the given supervisorTaskId and the location.
* If the segment file should be fetched from a remote site, the returned file will be created under the given
* partitionDir. Otherwise, the returned file can be located in any path.
*/
<T, P extends PartitionLocation<T>> File fetchSegmentFile(File partitionDir, String supervisorTaskId, P location)
throws IOException;
}

View File

@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.impl.NoopInputFormat;
@ -41,9 +40,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervi
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.IndexIO;
@ -57,7 +53,6 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.joda.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -71,28 +66,6 @@ public class TestUtils
public static final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> TASK_CLIENT_FACTORY = new NoopIndexTaskClientFactory<>();
public static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
public static final HttpClient SHUFFLE_CLIENT = new HttpClient()
{
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> handler
)
{
throw new UnsupportedOperationException();
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request,
HttpResponseHandler<Intermediate, Final> handler,
Duration readTimeout
)
{
throw new UnsupportedOperationException();
}
};
private static final Logger log = new Logger(TestUtils.class);
private final ObjectMapper jsonMapper;
@ -126,7 +99,6 @@ public class TestUtils
.addValue(AppenderatorsManager.class, APPENDERATORS_MANAGER)
.addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
.addValue(IndexTaskClientFactory.class, TASK_CLIENT_FACTORY)
.addValue(HttpClient.class, SHUFFLE_CLIENT)
);
jsonMapper.registerModule(

View File

@ -19,61 +19,40 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexingTest.TestSupervisorTask;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@RunWith(Parameterized.class)
@ -90,59 +69,22 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
private static final String DATA_SOURCE = "test";
private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new RetryPolicyFactory(new RetryPolicyConfig());
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2014-01-01/2014-01-02");
private final AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager();
private final LockGranularity lockGranularity;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final CoordinatorClient coordinatorClient;
public CompactionTaskParallelRunTest(LockGranularity lockGranularity)
{
this.lockGranularity = lockGranularity;
this.rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
coordinatorClient = new CoordinatorClient(null, null)
{
@Override
public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(
String dataSource,
List<Interval> intervals
)
{
return getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE);
}
@Override
public DataSegment fetchUsedSegment(String dataSource, String segmentId)
{
ImmutableDruidDataSource druidDataSource =
getSegmentsMetadataManager().getImmutableDataSourceWithUsedSegments(dataSource);
if (druidDataSource == null) {
throw new ISE("Unknown datasource[%s]", dataSource);
}
for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) {
DataSegment segment = druidDataSource.getSegment(possibleSegmentId);
if (segment != null) {
return segment;
}
}
throw new ISE("Can't find segment for id[%s]", segmentId);
}
};
}
@Before
public void setup() throws IOException
public void setup()
{
indexingServiceClient = new LocalIndexingServiceClient();
localDeepStorage = temporaryFolder.newFolder();
}
@After
public void teardown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
getObjectMapper().registerSubtypes(ParallelIndexTuningConfig.class, DruidInputSource.class);
}
@Test
@ -150,11 +92,13 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
{
runIndexTask();
final CompactionTask compactionTask = new TestCompactionTask(
final CompactionTask compactionTask = new CompactionTask(
null,
null,
DATA_SOURCE,
new CompactionIOConfig(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null)),
null,
null,
new CompactionIOConfig(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null)),
null,
null,
null,
@ -165,8 +109,8 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
rowIngestionMetersFactory,
coordinatorClient,
indexingServiceClient,
null,
null,
getSegmentLoaderFactory(),
RETRY_POLICY_FACTORY,
appenderatorsManager
@ -180,20 +124,18 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
{
runIndexTask();
Interval interval = Intervals.of("2014-01-01/2014-01-02");
List<InputSplit<List<WindowedSegmentId>>> splits = DruidInputSource.createSplits(
coordinatorClient,
getCoordinatorClient(),
RETRY_POLICY_FACTORY,
DATA_SOURCE,
interval,
INTERVAL_TO_INDEX,
new SegmentsSplitHintSpec(1L) // each segment gets its own split with this config
);
List<DataSegment> segments = new ArrayList<>(
coordinatorClient.fetchUsedSegmentsInDataSourceForIntervals(
getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(
DATA_SOURCE,
ImmutableList.of(interval)
ImmutableList.of(INTERVAL_TO_INDEX)
)
);
@ -249,15 +191,10 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
runTask(indexTask);
}
private void runTask(Task task) throws Exception
private void runTask(Task task)
{
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
shutdownTask(task);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
private static ParallelIndexTuningConfig newTuningConfig()
@ -291,69 +228,4 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
null
);
}
private static class TestCompactionTask extends CompactionTask
{
private final IndexingServiceClient indexingServiceClient;
TestCompactionTask(
String id,
TaskResource taskResource,
String dataSource,
@Nullable CompactionIOConfig ioConfig,
@Nullable DimensionsSpec dimensions,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable AggregatorFactory[] metricsSpec,
@Nullable Granularity segmentGranularity,
@Nullable ParallelIndexTuningConfig tuningConfig,
@Nullable Map<String, Object> context,
ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper,
ChatHandlerProvider chatHandlerProvider,
RowIngestionMetersFactory rowIngestionMetersFactory,
CoordinatorClient coordinatorClient,
@Nullable IndexingServiceClient indexingServiceClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory,
AppenderatorsManager appenderatorsManager
)
{
super(
id,
taskResource,
dataSource,
null,
null,
ioConfig,
dimensions,
dimensionsSpec,
metricsSpec,
segmentGranularity,
tuningConfig,
context,
jsonMapper,
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory,
coordinatorClient,
indexingServiceClient,
segmentLoaderFactory,
retryPolicyFactory,
appenderatorsManager
);
this.indexingServiceClient = indexingServiceClient;
}
@Override
ParallelIndexSupervisorTask newTask(String taskId, ParallelIndexIngestionSpec ingestionSpec)
{
return new TestSupervisorTask(
taskId,
null,
ingestionSpec,
createContextForSubtask(),
indexingServiceClient
);
}
}
}

View File

@ -824,9 +824,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
getTaskStorage().insert(task, TaskStatus.running(task.getId()));
final ObjectMapper objectMapper = getObjectMapper();
objectMapper.registerSubtypes(
new NamedType(LocalLoadSpec.class, "local")
);
objectMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
objectMapper.registerSubtypes(LocalDataSegmentPuller.class);
final TaskToolbox box = createTaskToolbox(objectMapper, task);

View File

@ -95,7 +95,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
private TaskLockbox lockbox;
@Before
public void setUp() throws IOException
public void setUpIngestionTestBase() throws IOException
{
temporaryFolder.create();
@ -119,7 +119,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
}
@After
public void tearDown()
public void tearDownIngestionTestBase()
{
temporaryFolder.delete();
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
@ -28,7 +27,6 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
@ -54,14 +52,10 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -87,21 +81,6 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
this.useInputFormatApi = useInputFormatApi;
}
@Before
public void setup() throws IOException
{
localDeepStorage = temporaryFolder.newFolder("localStorage");
indexingServiceClient = new LocalIndexingServiceClient();
initializeIntermediaryDataManager();
}
@After
public void teardown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
}
Set<DataSegment> runTestTask(
ParseSpec parseSpec,
Interval interval,
@ -110,7 +89,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks,
TaskState expectedTaskStatus
) throws Exception
)
{
final ParallelIndexSupervisorTask task = newTask(
parseSpec,
@ -121,18 +100,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
maxNumConcurrentSubTasks
);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
TaskStatus taskStatus = task.run(toolbox);
TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task);
Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
shutdownTask(task);
return actionClient.getPublishedSegments();
return getIndexingServiceClient().getPublishedSegments(task);
}
private ParallelIndexSupervisorTask newTask(
@ -228,23 +199,20 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
}
// set up test tools
return createParallelIndexSupervisorTask(
return new ParallelIndexSupervisorTask(
null,
null,
null,
ingestionSpec,
new HashMap<>(),
indexingServiceClient
Collections.emptyMap(),
null,
null,
null,
null,
null
);
}
abstract ParallelIndexSupervisorTask createParallelIndexSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
);
List<ScanResultValue> querySegment(DataSegment dataSegment, List<String> columns, File tempSegmentDir)
{
Segment segment = loadSegment(dataSegment, tempSegmentDir);

View File

@ -19,12 +19,20 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
@ -35,38 +43,60 @@ import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.security.AllowAllAuthorizer;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@ -74,14 +104,21 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
{
static final String DISABLE_TASK_INJECT_CONTEXT_KEY = "disableInject";
static final TimestampSpec DEFAULT_TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
static final DimensionsSpec DEFAULT_DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim"))
@ -124,18 +161,27 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null
);
protected TestLocalTaskActionClient actionClient;
protected LocalIndexingServiceClient indexingServiceClient;
protected TaskToolbox toolbox;
protected File localDeepStorage;
private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class);
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private File localDeepStorage;
private SimpleThreadingTaskRunner taskRunner;
private ObjectMapper objectMapper;
private LocalIndexingServiceClient indexingServiceClient;
private IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> indexTaskClientFactory;
private IntermediaryDataManager intermediaryDataManager;
private CoordinatorClient coordinatorClient;
protected void initializeIntermediaryDataManager() throws IOException
@Before
public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException
{
localDeepStorage = temporaryFolder.newFolder("localStorage");
taskRunner = new SimpleThreadingTaskRunner();
objectMapper = getObjectMapper();
indexingServiceClient = new LocalIndexingServiceClient(objectMapper, taskRunner);
indexTaskClientFactory = new LocalParallelIndexTaskClientFactory(taskRunner);
intermediaryDataManager = new IntermediaryDataManager(
new WorkerConfig(),
new TaskConfig(
@ -151,127 +197,196 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
),
null
);
LocalShuffleClient shuffleClient = new LocalShuffleClient(intermediaryDataManager);
coordinatorClient = new LocalCoordinatorClient();
prepareObjectMapper(
objectMapper,
getIndexIO(),
indexingServiceClient,
indexTaskClientFactory,
shuffleClient,
coordinatorClient
);
}
public class LocalIndexingServiceClient extends NoopIndexingServiceClient
@After
public void tearDownAbstractParallelIndexSupervisorTaskTest()
{
private final ConcurrentMap<String, Future<TaskStatus>> tasks = new ConcurrentHashMap<>();
taskRunner.shutdown();
temporaryFolder.delete();
}
protected LocalIndexingServiceClient getIndexingServiceClient()
{
return indexingServiceClient;
}
protected IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> getParallelIndexTaskClientFactory()
{
return indexTaskClientFactory;
}
protected CoordinatorClient getCoordinatorClient()
{
return coordinatorClient;
}
private static class TaskContainer
{
private final Task task;
@MonotonicNonNull
private volatile Future<TaskStatus> statusFuture;
@MonotonicNonNull
private volatile TestLocalTaskActionClient actionClient;
private TaskContainer(Task task)
{
this.task = task;
}
private void setStatusFuture(Future<TaskStatus> statusFuture)
{
this.statusFuture = statusFuture;
}
private void setActionClient(TestLocalTaskActionClient actionClient)
{
this.actionClient = actionClient;
}
}
public class SimpleThreadingTaskRunner
{
private final ConcurrentMap<String, TaskContainer> tasks = new ConcurrentHashMap<>();
private final ListeningExecutorService service = MoreExecutors.listeningDecorator(
Execs.multiThreaded(5, "parallel-index-supervisor-task-test-%d")
Execs.multiThreaded(5, "simple-threading-task-runner-%d")
);
@Override
public String runTask(Object taskObject)
public String run(Task task)
{
runTask(task);
return task.getId();
}
private TaskStatus runAndWait(Task task)
{
final Task subTask = (Task) taskObject;
try {
getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
return runTask(task).get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
private TaskStatus waitToFinish(Task task, long waitTime, TimeUnit timeUnit)
{
final TaskContainer taskContainer = tasks.get(task.getId());
if (taskContainer == null) {
throw new IAE("Unknown task[%s]", task.getId());
}
try {
while (taskContainer.statusFuture == null && !Thread.currentThread().isInterrupted()) {
Thread.sleep(10);
}
return taskContainer.statusFuture.get(waitTime, timeUnit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
private Future<TaskStatus> runTask(Task task)
{
final TaskContainer taskContainer = new TaskContainer(task);
if (tasks.put(task.getId(), taskContainer) != null) {
throw new ISE("Duplicate task ID[%s]", task.getId());
}
try {
prepareTaskForLocking(task);
}
catch (EntryExistsException e) {
throw new RuntimeException(e);
}
// WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
// cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
// JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
tasks.put(subTask.getId(), service.submit(() -> {
try {
final TaskToolbox toolbox = createTaskToolbox(subTask);
if (subTask.isReady(toolbox.getTaskActionClient())) {
return subTask.run(toolbox);
} else {
getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
throw new ISE("task[%s] is not ready", subTask.getId());
final ListenableFuture<TaskStatus> statusFuture = service.submit(
() -> {
try {
final TestLocalTaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
taskContainer.setActionClient(actionClient);
if (task.isReady(toolbox.getTaskActionClient())) {
return task.run(toolbox);
} else {
getTaskStorage().setStatus(TaskStatus.failure(task.getId()));
throw new ISE("task[%s] is not ready", task.getId());
}
}
catch (Exception e) {
getTaskStorage().setStatus(TaskStatus.failure(task.getId(), e.getMessage()));
throw new RuntimeException(e);
}
}
}
catch (Exception e) {
getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), e.getMessage()));
throw new RuntimeException(e);
}
}));
return subTask.getId();
);
taskContainer.setStatusFuture(statusFuture);
final ListenableFuture<TaskStatus> cleanupFuture = Futures.transform(
statusFuture,
(Function<TaskStatus, TaskStatus>) status -> {
shutdownTask(task);
return status;
}
);
return cleanupFuture;
}
@Override
public TaskStatusResponse getTaskStatus(String taskId)
@Nullable
public String cancel(String taskId)
{
final Future<TaskStatus> taskStatusFuture = tasks.get(taskId);
final Optional<Task> task = getTaskStorage().getTask(taskId);
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
if (taskStatusFuture != null) {
final TaskContainer taskContainer = tasks.remove(taskId);
if (taskContainer != null && taskContainer.statusFuture != null) {
taskContainer.statusFuture.cancel(true);
return taskId;
} else {
return null;
}
}
@Nullable
public TaskStatus getStatus(String taskId)
{
final TaskContainer taskContainer = tasks.get(taskId);
if (taskContainer != null && taskContainer.statusFuture != null) {
try {
if (taskStatusFuture.isDone()) {
final TaskStatus taskStatus = taskStatusFuture.get();
return new TaskStatusResponse(
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
taskStatus.getStatusCode(),
RunnerTaskState.NONE,
-1L,
TaskLocation.unknown(),
null,
null
)
);
if (taskContainer.statusFuture.isDone()) {
return taskContainer.statusFuture.get();
} else {
return new TaskStatusResponse(
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
TaskState.RUNNING,
RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
null,
null
)
);
return TaskStatus.running(taskId);
}
}
catch (InterruptedException | ExecutionException e) {
// We don't have a way to propagate this exception to the supervisorTask yet..
// So, let's print it here.
System.err.println(Throwables.getStackTraceAsString(e));
return new TaskStatusResponse(
taskId,
new TaskStatusPlus(
taskId,
groupId,
SinglePhaseSubTask.TYPE,
DateTimes.EPOCH,
DateTimes.EPOCH,
TaskState.FAILED,
RunnerTaskState.NONE,
-1L,
TaskLocation.unknown(),
null,
null
)
);
return TaskStatus.failure(taskId, e.getMessage());
}
} else {
return new TaskStatusResponse(taskId, null);
return null;
}
}
@Override
public String cancelTask(String taskId)
public Set<DataSegment> getPublishedSegments(String taskId)
{
final Future<TaskStatus> taskStatusFuture = tasks.remove(taskId);
if (taskStatusFuture != null) {
taskStatusFuture.cancel(true);
return taskId;
final TaskContainer taskContainer = tasks.get(taskId);
if (taskContainer == null || taskContainer.actionClient == null) {
return Collections.emptySet();
} else {
return null;
return taskContainer.actionClient.getPublishedSegments();
}
}
@ -281,7 +396,132 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
}
}
protected TaskToolbox createTaskToolbox(Task task) throws IOException
public class LocalIndexingServiceClient extends NoopIndexingServiceClient
{
private final ObjectMapper objectMapper;
private final SimpleThreadingTaskRunner taskRunner;
public LocalIndexingServiceClient(ObjectMapper objectMapper, SimpleThreadingTaskRunner taskRunner)
{
this.objectMapper = objectMapper;
this.taskRunner = taskRunner;
}
@Override
public String runTask(Object taskObject)
{
final Task task = (Task) taskObject;
return taskRunner.run(injectIfNeeded(task));
}
public TaskStatus runAndWait(Task task)
{
return taskRunner.runAndWait(injectIfNeeded(task));
}
public TaskStatus waitToFinish(Task task, long timeout, TimeUnit timeUnit)
{
return taskRunner.waitToFinish(task, timeout, timeUnit);
}
private Task injectIfNeeded(Task task)
{
if (!task.getContextValue(DISABLE_TASK_INJECT_CONTEXT_KEY, false)) {
try {
final byte[] json = objectMapper.writeValueAsBytes(task);
return objectMapper.readValue(json, Task.class);
}
catch (IOException e) {
LOG.error(e, "Error while serializing and deserializing task spec");
throw new RuntimeException(e);
}
} else {
return task;
}
}
@Override
public String cancelTask(String taskId)
{
return taskRunner.cancel(taskId);
}
@Override
public TaskStatusResponse getTaskStatus(String taskId)
{
final Optional<Task> task = getTaskStorage().getTask(taskId);
final String groupId = task.isPresent() ? task.get().getGroupId() : null;
final String taskType = task.isPresent() ? task.get().getType() : null;
final TaskStatus taskStatus = taskRunner.getStatus(taskId);
if (taskStatus != null) {
return new TaskStatusResponse(
taskId,
new TaskStatusPlus(
taskId,
groupId,
taskType,
DateTimes.EPOCH,
DateTimes.EPOCH,
taskStatus.getStatusCode(),
taskStatus.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING,
-1L,
TaskLocation.unknown(),
null,
null
)
);
} else {
return new TaskStatusResponse(taskId, null);
}
}
public Set<DataSegment> getPublishedSegments(Task task)
{
return taskRunner.getPublishedSegments(task.getId());
}
}
public static void prepareObjectMapper(
ObjectMapper objectMapper,
IndexIO indexIO,
IndexingServiceClient indexingServiceClient,
IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> indexTaskClientFactory,
ShuffleClient shuffleClient,
CoordinatorClient coordinatorClient
)
{
objectMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class, LookupEnabledTestExprMacroTable.INSTANCE)
.addValue(IndexIO.class, indexIO)
.addValue(ObjectMapper.class, objectMapper)
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
.addValue(AuthConfig.class, new AuthConfig())
.addValue(AuthorizerMapper.class, null)
.addValue(RowIngestionMetersFactory.class, new DropwizardRowIngestionMetersFactory())
.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT)
.addValue(IndexingServiceClient.class, indexingServiceClient)
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
.addValue(AppenderatorsManager.class, TestUtils.APPENDERATORS_MANAGER)
.addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
.addValue(IndexTaskClientFactory.class, indexTaskClientFactory)
.addValue(ShuffleClient.class, shuffleClient)
.addValue(CoordinatorClient.class, coordinatorClient)
.addValue(SegmentLoaderFactory.class, new SegmentLoaderFactory(indexIO, objectMapper))
.addValue(RetryPolicyFactory.class, new RetryPolicyFactory(new RetryPolicyConfig()))
);
objectMapper.registerSubtypes(
new NamedType(ParallelIndexSupervisorTask.class, ParallelIndexSupervisorTask.TYPE),
new NamedType(SinglePhaseSubTask.class, SinglePhaseSubTask.TYPE),
new NamedType(PartialHashSegmentGenerateTask.class, PartialHashSegmentGenerateTask.TYPE),
new NamedType(PartialHashSegmentMergeTask.class, PartialHashSegmentMergeTask.TYPE),
new NamedType(PartialRangeSegmentGenerateTask.class, PartialRangeSegmentGenerateTask.TYPE),
new NamedType(PartialGenericSegmentMergeTask.class, PartialGenericSegmentMergeTask.TYPE),
new NamedType(PartialDimensionDistributionTask.class, PartialDimensionDistributionTask.TYPE)
);
}
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
return new TaskToolbox(
null,
@ -309,7 +549,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
NoopJoinableFactory.INSTANCE,
null,
newSegmentLoader(temporaryFolder.newFolder()),
getObjectMapper(),
objectMapper,
temporaryFolder.newFolder(task.getId()),
getIndexIO(),
null,
@ -379,13 +619,42 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
}
}
static class LocalShuffleClient implements ShuffleClient
{
private final IntermediaryDataManager intermediaryDataManager;
LocalShuffleClient(IntermediaryDataManager intermediaryDataManager)
{
this.intermediaryDataManager = intermediaryDataManager;
}
@Override
public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
File partitionDir,
String supervisorTaskId,
P location
)
{
final File zippedFile = intermediaryDataManager.findPartitionFile(
supervisorTaskId,
location.getSubTaskId(),
location.getInterval(),
location.getPartitionId()
);
if (zippedFile == null) {
throw new ISE("Can't find segment file for location[%s] at path[%s]", location);
}
return zippedFile;
}
}
static class LocalParallelIndexTaskClientFactory implements IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>
{
private final ParallelIndexSupervisorTask supervisorTask;
private final ConcurrentMap<String, TaskContainer> tasks;
LocalParallelIndexTaskClientFactory(ParallelIndexSupervisorTask supervisorTask)
LocalParallelIndexTaskClientFactory(SimpleThreadingTaskRunner taskRunner)
{
this.supervisorTask = supervisorTask;
this.tasks = taskRunner.tasks;
}
@Override
@ -397,30 +666,95 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
long numRetries
)
{
return new LocalParallelIndexSupervisorTaskClient(callerId, supervisorTask);
return new LocalParallelIndexSupervisorTaskClient(callerId, tasks);
}
}
static class LocalParallelIndexSupervisorTaskClient extends ParallelIndexSupervisorTaskClient
{
private final ParallelIndexSupervisorTask supervisorTask;
private final ConcurrentMap<String, TaskContainer> tasks;
LocalParallelIndexSupervisorTaskClient(String callerId, ParallelIndexSupervisorTask supervisorTask)
LocalParallelIndexSupervisorTaskClient(String callerId, ConcurrentMap<String, TaskContainer> tasks)
{
super(null, null, null, null, callerId, 0);
this.supervisorTask = supervisorTask;
this.tasks = tasks;
}
@Override
public SegmentIdWithShardSpec allocateSegment(String supervisorTaskId, DateTime timestamp) throws IOException
{
final TaskContainer taskContainer = tasks.get(supervisorTaskId);
final ParallelIndexSupervisorTask supervisorTask = findSupervisorTask(taskContainer);
if (supervisorTask == null) {
throw new ISE("Cannot find supervisor task for [%s]", supervisorTaskId);
}
return supervisorTask.allocateNewSegment(timestamp);
}
@Override
public void report(String supervisorTaskId, SubTaskReport report)
{
final TaskContainer taskContainer = tasks.get(supervisorTaskId);
final ParallelIndexSupervisorTask supervisorTask = findSupervisorTask(taskContainer);
if (supervisorTask == null) {
throw new ISE("Cannot find supervisor task for [%s]", supervisorTaskId);
}
supervisorTask.getCurrentRunner().collectReport(report);
}
@Nullable
private ParallelIndexSupervisorTask findSupervisorTask(TaskContainer taskContainer)
{
if (taskContainer == null) {
return null;
}
if (taskContainer.task instanceof CompactionTask) {
final Task task = ((CompactionTask) taskContainer.task).getCurrentSubTaskHolder().getTask();
if (!(task instanceof ParallelIndexSupervisorTask)) {
return null;
} else {
return (ParallelIndexSupervisorTask) task;
}
} else if (!(taskContainer.task instanceof ParallelIndexSupervisorTask)) {
return null;
} else {
return (ParallelIndexSupervisorTask) taskContainer.task;
}
}
}
class LocalCoordinatorClient extends CoordinatorClient
{
LocalCoordinatorClient()
{
super(null, null);
}
@Override
public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(
String dataSource,
List<Interval> intervals
)
{
return getStorageCoordinator().retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE);
}
@Override
public DataSegment fetchUsedSegment(String dataSource, String segmentId)
{
ImmutableDruidDataSource druidDataSource =
getSegmentsMetadataManager().getImmutableDataSourceWithUsedSegments(dataSource);
if (druidDataSource == null) {
throw new ISE("Unknown datasource[%s]", dataSource);
}
for (SegmentId possibleSegmentId : SegmentId.iteratePossibleParsingsWithDataSource(dataSource, segmentId)) {
DataSegment segment = druidDataSource.getSegment(possibleSegmentId);
if (segment != null) {
return segment;
}
}
throw new ISE("Can't find segment for id[%s]", segmentId);
}
}
}

View File

@ -20,8 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
@ -29,11 +27,6 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.scan.ScanResultValue;
@ -46,7 +39,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
@ -68,17 +60,14 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")),
new ArrayList<>(),
new ArrayList<>()
),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))),
null,
Arrays.asList("ts", "dim1", "dim2", "val"),
false,
0
);
private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
@ -97,12 +86,9 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
super(lockGranularity, useInputFormatApi);
}
@Override
@Before
public void setup() throws IOException
{
super.setup();
inputDir = temporaryFolder.newFolder("data");
// set up data
for (int i = 0; i < 10; i++) {
@ -128,7 +114,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
{
final Set<DataSegment> publishedSegments = runTestTask(
PARSE_SPEC,
Intervals.of("2017/2018"),
INTERVAL_TO_INDEX,
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
@ -159,181 +145,4 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
}
}
}
@Override
ParallelIndexSupervisorTask createParallelIndexSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
)
{
return new TestSupervisorTask(id, taskResource, ingestionSchema, context, indexingServiceClient);
}
private static class TestSupervisorTask extends TestParallelIndexSupervisorTask
{
TestSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
)
{
super(id, taskResource, ingestionSchema, context, indexingServiceClient);
}
@Override
public PartialHashSegmentGenerateParallelIndexTaskRunner createPartialHashSegmentGenerateRunner(TaskToolbox toolbox)
{
return new TestPartialHashSegmentGenerateRunner(toolbox, this, getIndexingServiceClient());
}
@Override
public PartialHashSegmentMergeParallelIndexTaskRunner createPartialHashSegmentMergeRunner(
TaskToolbox toolbox,
List<PartialHashSegmentMergeIOConfig> ioConfigs
)
{
return new TestPartialHashSegmentMergeParallelIndexTaskRunner(
toolbox,
this,
ioConfigs,
getIndexingServiceClient()
);
}
}
private static class TestPartialHashSegmentGenerateRunner extends PartialHashSegmentGenerateParallelIndexTaskRunner
{
private TestPartialHashSegmentGenerateRunner(
TaskToolbox toolbox,
ParallelIndexSupervisorTask supervisorTask,
IndexingServiceClient indexingServiceClient
)
{
super(
toolbox,
supervisorTask.getId(),
supervisorTask.getGroupId(),
supervisorTask.getIngestionSchema(),
supervisorTask.getContext(),
indexingServiceClient,
new LocalParallelIndexTaskClientFactory(supervisorTask),
new TestAppenderatorsManager()
);
}
}
private static class TestPartialHashSegmentMergeParallelIndexTaskRunner
extends PartialHashSegmentMergeParallelIndexTaskRunner
{
private final ParallelIndexSupervisorTask supervisorTask;
private TestPartialHashSegmentMergeParallelIndexTaskRunner(
TaskToolbox toolbox,
ParallelIndexSupervisorTask supervisorTask,
List<PartialHashSegmentMergeIOConfig> mergeIOConfigs,
IndexingServiceClient indexingServiceClient
)
{
super(
toolbox,
supervisorTask.getId(),
supervisorTask.getGroupId(),
supervisorTask.getIngestionSchema().getDataSchema(),
mergeIOConfigs,
supervisorTask.getIngestionSchema().getTuningConfig(),
supervisorTask.getContext(),
indexingServiceClient
);
this.supervisorTask = supervisorTask;
}
@Override
SubTaskSpec<PartialHashSegmentMergeTask> newTaskSpec(PartialHashSegmentMergeIOConfig ioConfig)
{
final PartialHashSegmentMergeIngestionSpec ingestionSpec =
new PartialHashSegmentMergeIngestionSpec(
supervisorTask.getIngestionSchema().getDataSchema(),
ioConfig,
getTuningConfig()
);
return new SubTaskSpec<PartialHashSegmentMergeTask>(
getTaskId() + "_" + getAndIncrementNextSpecId(),
getGroupId(),
getTaskId(),
getContext(),
new InputSplit<>(ioConfig.getPartitionLocations())
)
{
@Override
public PartialHashSegmentMergeTask newSubTask(int numAttempts)
{
return new TestPartialHashSegmentMergeTask(
null,
getGroupId(),
null,
getSupervisorTaskId(),
numAttempts,
ingestionSpec,
getContext(),
getIndexingServiceClient(),
new LocalParallelIndexTaskClientFactory(supervisorTask),
getToolbox()
);
}
};
}
}
private static class TestPartialHashSegmentMergeTask extends PartialHashSegmentMergeTask
{
private final TaskToolbox toolbox;
private TestPartialHashSegmentMergeTask(
@Nullable String id,
String groupId,
TaskResource taskResource,
String supervisorTaskId,
int numAttempts,
PartialHashSegmentMergeIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient,
IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
TaskToolbox toolbox
)
{
super(
id,
groupId,
taskResource,
supervisorTaskId,
numAttempts,
ingestionSchema,
context,
indexingServiceClient,
taskClientFactory,
null
);
this.toolbox = toolbox;
}
@Override
File fetchSegmentFile(File partitionDir, HashPartitionLocation location)
{
final File zippedFile = toolbox.getIntermediaryDataManager().findPartitionFile(
getSupervisorTaskId(),
location.getSubTaskId(),
location.getInterval(),
location.getPartitionId()
);
if (zippedFile == null) {
throw new ISE("Can't find segment file for location[%s] at path[%s]", location);
}
return zippedFile;
}
}
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class HttpShuffleClientTest
{
private static final String SUPERVISOR_TASK_ID = "supervisorTaskId";
private static final String SUBTASK_ID = "subtaskId";
private static final Interval INTERVAL = Intervals.of("2019/2020");
private static final String HOST = "host";
private static final int PORT = 1080;
private static final int PARTITION_ID = 0;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ExpectedException expectedException = ExpectedException.none();
private File segmentFile;
@Before
public void setup() throws IOException
{
segmentFile = temporaryFolder.newFile();
try (Writer writer = Files.newBufferedWriter(segmentFile.toPath(), StandardCharsets.UTF_8)) {
for (int j = 0; j < 10; j++) {
writer.write(StringUtils.format("let's write some data.\n"));
}
}
}
@Test
public void testFetchSegmentFileWithValidParamsReturningCopiedFileInPartitoinDir() throws IOException
{
ShuffleClient shuffleClient = mockClient(0);
final File localDir = temporaryFolder.newFolder();
final File fetchedFile = shuffleClient.fetchSegmentFile(
localDir,
SUPERVISOR_TASK_ID,
new TestPartitionLocation()
);
Assert.assertEquals(fetchedFile.getParentFile(), localDir);
}
@Test
public void testFetchUnknownPartitionThrowingIOExceptionAfterRetries() throws IOException
{
expectedException.expect(IOException.class);
ShuffleClient shuffleClient = mockClient(HttpShuffleClient.NUM_FETCH_RETRIES + 1);
shuffleClient.fetchSegmentFile(
temporaryFolder.newFolder(),
SUPERVISOR_TASK_ID,
new TestPartitionLocation()
);
}
@Test
public void testFetchSegmentFileWithTransientFailuresReturningCopiedFileInPartitionDir() throws IOException
{
ShuffleClient shuffleClient = mockClient(HttpShuffleClient.NUM_FETCH_RETRIES - 1);
final File localDir = temporaryFolder.newFolder();
final File fetchedFile = shuffleClient.fetchSegmentFile(
localDir,
SUPERVISOR_TASK_ID,
new TestPartitionLocation()
);
Assert.assertEquals(fetchedFile.getParentFile(), localDir);
}
@Test
public void testFetchSegmentFileWithTwoThreadsReturningCopiedFilesInPartitionDir()
throws IOException, ExecutionException, InterruptedException
{
ExecutorService service = Execs.multiThreaded(2, "http-shuffle-client-test-%d");
ShuffleClient shuffleClient = mockClient(0);
try {
List<Future<File>> futures = new ArrayList<>();
List<File> localDirs = new ArrayList<>();
for (int i = 0; i < 2; i++) {
localDirs.add(temporaryFolder.newFolder());
}
for (int i = 0; i < 2; i++) {
final File localDir = localDirs.get(i);
futures.add(
service.submit(() -> shuffleClient.fetchSegmentFile(
localDir,
SUPERVISOR_TASK_ID,
new TestPartitionLocation()
))
);
}
for (int i = 0; i < futures.size(); i++) {
Assert.assertEquals(futures.get(i).get().getParentFile(), localDirs.get(i));
}
}
finally {
service.shutdownNow();
}
}
@Test
public void testFetchSegmentFileWithTwoThreadsAndTransitentFailuresReturningCopiedFilesInPartitionDir()
throws IOException, ExecutionException, InterruptedException
{
ExecutorService service = Execs.multiThreaded(2, "http-shuffle-client-test-%d");
ShuffleClient shuffleClient = mockClient(HttpShuffleClient.NUM_FETCH_RETRIES - 1);
try {
List<Future<File>> futures = new ArrayList<>();
List<File> localDirs = new ArrayList<>();
for (int i = 0; i < 2; i++) {
localDirs.add(temporaryFolder.newFolder());
}
for (int i = 0; i < 2; i++) {
final File localDir = localDirs.get(i);
futures.add(
service.submit(() -> shuffleClient.fetchSegmentFile(
localDir,
SUPERVISOR_TASK_ID,
new TestPartitionLocation()
))
);
}
for (int i = 0; i < futures.size(); i++) {
Assert.assertEquals(futures.get(i).get().getParentFile(), localDirs.get(i));
}
}
finally {
service.shutdownNow();
}
}
private HttpShuffleClient mockClient(int numFailures) throws FileNotFoundException
{
HttpClient httpClient = EasyMock.strictMock(HttpClient.class);
if (numFailures == 0) {
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject()))
// should return different instances of input stream
.andReturn(Futures.immediateFuture(new FileInputStream(segmentFile)))
.andReturn(Futures.immediateFuture(new FileInputStream(segmentFile)));
} else {
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(numFailures)
// should return different instances of input stream
.andReturn(Futures.immediateFuture(new FileInputStream(segmentFile)))
.andReturn(Futures.immediateFuture(new FileInputStream(segmentFile)));
}
EasyMock.replay(httpClient);
return new HttpShuffleClient(httpClient);
}
private static class TestPartitionLocation extends PartitionLocation<Integer>
{
private TestPartitionLocation()
{
super(HOST, PORT, false, SUBTASK_ID, INTERVAL, PARTITION_ID);
}
@Override
int getPartitionId()
{
return getSecondaryPartition();
}
}
}

View File

@ -19,11 +19,13 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.After;
@ -64,28 +66,27 @@ public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSuperviso
}
}
indexingServiceClient = new LocalIndexingServiceClient();
localDeepStorage = temporaryFolder.newFolder("localStorage");
getObjectMapper().registerSubtypes(new NamedType(ReportingNoopTask.class, "reporting_noop"));
}
@Override
@After
public void tearDown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
}
@Test
public void testLargeEstimatedNumSplits() throws Exception
{
final NoopTask task = NoopTask.create();
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
final TestPhaseRunner runner = new TestPhaseRunner(
toolbox,
"supervisorTaskId",
"groupId",
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
Collections.emptyMap(),
indexingServiceClient,
getIndexingServiceClient(),
10,
12
);
@ -95,13 +96,15 @@ public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSuperviso
@Test
public void testSmallEstimatedNumSplits() throws Exception
{
final NoopTask task = NoopTask.create();
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
final TestPhaseRunner runner = new TestPhaseRunner(
toolbox,
"supervisorTaskId",
"groupId",
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
Collections.emptyMap(),
indexingServiceClient,
getIndexingServiceClient(),
10,
8
);
@ -118,7 +121,6 @@ public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSuperviso
String supervisorTaskId,
String groupId,
ParallelIndexTuningConfig tuningConfig,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient,
int actualNumSubTasks,
int estimatedNumSubTasks
@ -129,7 +131,7 @@ public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSuperviso
supervisorTaskId,
groupId,
tuningConfig,
context,
Collections.emptyMap(),
indexingServiceClient
);
this.actualNumSubTasks = actualNumSubTasks;
@ -223,7 +225,16 @@ public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSuperviso
private ReportingNoopTask(String groupId, TestPhaseRunner phaseRunner)
{
super(null, groupId, null, 10, 0, null, null, null);
super(
null,
groupId,
null,
10,
0,
null,
null,
Collections.singletonMap(AbstractParallelIndexSupervisorTaskTest.DISABLE_TASK_INJECT_CONTEXT_KEY, true)
);
this.phaseRunner = phaseRunner;
}

View File

@ -37,7 +37,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@ -47,20 +46,17 @@ import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSupervisorTaskTest
@ -68,22 +64,10 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
@Rule
public ExpectedException expectedException = ExpectedException.none();
private ExecutorService service;
@Before
public void setup() throws IOException
{
indexingServiceClient = new LocalIndexingServiceClient();
localDeepStorage = temporaryFolder.newFolder("localStorage");
service = Execs.singleThreaded("ParallelIndexSupervisorTaskKillTest-%d");
}
@After
public void teardown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
service.shutdownNow();
}
@Test(timeout = 5000L)
@ -99,20 +83,14 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
false
)
);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
final Future<TaskState> future = service.submit(() -> task.run(toolbox).getStatusCode());
getIndexingServiceClient().runTask(task);
while (task.getCurrentRunner() == null) {
Thread.sleep(100);
}
task.stopGracefully(null);
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class));
future.get();
expectedException.expect(RuntimeException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class));
getIndexingServiceClient().waitToFinish(task, 3000L, TimeUnit.MILLISECONDS);
final TestSinglePhaseParallelIndexTaskRunner runner = (TestSinglePhaseParallelIndexTaskRunner) task.getCurrentRunner();
Assert.assertTrue(runner.getRunningTaskIds().isEmpty());
@ -137,8 +115,8 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
false
)
);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
@ -219,8 +197,8 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
// set up test tools
return new TestSupervisorTask(
ingestionSpec,
Collections.emptyMap(),
indexingServiceClient
Collections.singletonMap(AbstractParallelIndexSupervisorTaskTest.DISABLE_TASK_INJECT_CONTEXT_KEY, true),
getIndexingServiceClient()
);
}
@ -282,7 +260,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
}
}
private static class TestSupervisorTask extends TestParallelIndexSupervisorTask
private class TestSupervisorTask extends TestParallelIndexSupervisorTask
{
private final IndexingServiceClient indexingServiceClient;
@ -313,7 +291,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
}
}
private static class TestRunner extends TestSinglePhaseParallelIndexTaskRunner
private class TestRunner extends TestSinglePhaseParallelIndexTaskRunner
{
private final ParallelIndexSupervisorTask supervisorTask;
@ -360,10 +338,8 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
}
}
private static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec
private class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec
{
private final ParallelIndexSupervisorTask supervisorTask;
private TestSinglePhaseSubTaskSpec(
String id,
String groupId,
@ -374,7 +350,6 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
)
{
super(id, groupId, supervisorTask.getId(), ingestionSpec, context, inputSplit);
this.supervisorTask = supervisorTask;
}
@Override
@ -389,7 +364,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
getIngestionSpec(),
getContext(),
null,
new LocalParallelIndexTaskClientFactory(supervisorTask)
getParallelIndexTaskClientFactory()
);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRun
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@ -60,13 +59,11 @@ import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -78,8 +75,6 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -114,24 +109,12 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
*/
private final CopyOnWriteArrayList<TestSubTask> runningTasks = new CopyOnWriteArrayList<>();
private ExecutorService service;
private TestSupervisorTask task;
@Before
public void setup() throws IOException
{
service = Execs.singleThreaded("parallel-index-supervisor-task-resource-test-%d");
indexingServiceClient = new LocalIndexingServiceClient();
localDeepStorage = temporaryFolder.newFolder("localStorage");
}
@After
public void teardown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
service.shutdownNow();
}
@Test(timeout = 20000L)
@ -146,12 +129,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
false
)
);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
final Future<TaskStatus> supervisorTaskFuture = service.submit(() -> task.run(toolbox));
getIndexingServiceClient().runTask(task);
Thread.sleep(1000);
final SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner();
@ -263,7 +241,10 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
Thread.sleep(100);
}
Assert.assertEquals(TaskState.SUCCESS, supervisorTaskFuture.get(1000, TimeUnit.MILLISECONDS).getStatusCode());
Assert.assertEquals(
TaskState.SUCCESS,
getIndexingServiceClient().waitToFinish(task, 1000, TimeUnit.MILLISECONDS).getStatusCode()
);
}
@SuppressWarnings({"ConstantConditions"})
@ -467,8 +448,8 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
null,
null,
ingestionSpec,
Collections.emptyMap(),
indexingServiceClient
Collections.singletonMap(AbstractParallelIndexSupervisorTaskTest.DISABLE_TASK_INJECT_CONTEXT_KEY, true),
getIndexingServiceClient()
);
}
@ -531,7 +512,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
return new TestRunner(
toolbox,
this,
indexingServiceClient
getIndexingServiceClient()
);
}
}
@ -587,8 +568,6 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
private class TestSubTaskSpec extends SinglePhaseSubTaskSpec
{
private final ParallelIndexSupervisorTask supervisorTask;
TestSubTaskSpec(
String id,
String groupId,
@ -599,7 +578,6 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
)
{
super(id, groupId, supervisorTask.getId(), ingestionSpec, context, inputSplit);
this.supervisorTask = supervisorTask;
}
@Override
@ -618,10 +596,13 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
numAttempts,
getIngestionSpec(),
getContext(),
new LocalParallelIndexTaskClientFactory(supervisorTask)
getParallelIndexTaskClientFactory()
);
final TestInputSource inputSource = (TestInputSource) getIngestionSpec().getIOConfig().getInputSource();
final InputSplit<Integer> split = inputSource.createSplits(getIngestionSpec().getIOConfig().getInputFormat(), null)
final InputSplit<Integer> split = inputSource.createSplits(
getIngestionSpec().getIOConfig().getInputFormat(),
null
)
.findFirst()
.orElse(null);
if (split == null) {
@ -651,7 +632,6 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
private class TestSubTask extends SinglePhaseSubTask
{
private final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory;
private volatile TaskState state = TaskState.RUNNING;
TestSubTask(
@ -675,7 +655,6 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
taskClientFactory,
new TestAppenderatorsManager()
);
this.taskClientFactory = taskClientFactory;
}
@Override
@ -686,7 +665,13 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
}
// build LocalParallelIndexTaskClient
final ParallelIndexSupervisorTaskClient taskClient = taskClientFactory.build(null, getId(), 0, null, 0);
final ParallelIndexSupervisorTaskClient taskClient = getParallelIndexTaskClientFactory().build(
null,
getId(),
0,
null,
0
);
final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) getIngestionSchema()
.getTuningConfig()
.getGivenOrDefaultPartitionsSpec();

View File

@ -37,8 +37,8 @@ import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@ -50,6 +50,7 @@ import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -70,7 +71,18 @@ class ParallelIndexTestingFactory
static final IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> TASK_CLIENT_FACTORY =
TestUtils.TASK_CLIENT_FACTORY;
static final AppenderatorsManager APPENDERATORS_MANAGER = TestUtils.APPENDERATORS_MANAGER;
static final HttpClient SHUFFLE_CLIENT = TestUtils.SHUFFLE_CLIENT;
static final ShuffleClient SHUFFLE_CLIENT = new ShuffleClient()
{
@Override
public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
File partitionDir,
String supervisorTaskId,
P location
)
{
return null;
}
};
static final List<Interval> INPUT_INTERVALS = Collections.singletonList(Intervals.ETERNITY);
static final String TASK_EXECUTOR_HOST = "task-executor-host";
static final int TASK_EXECUTOR_PORT = 1;
@ -100,6 +112,11 @@ class ParallelIndexTestingFactory
return TEST_UTILS.getTestObjectMapper();
}
static IndexIO getIndexIO()
{
return TEST_UTILS.getTestIndexIO();
}
@SuppressWarnings("SameParameterValue")
static class TuningConfigBuilder
{

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.segment.TestHelper;
import org.hamcrest.Matchers;
@ -29,9 +28,8 @@ import org.junit.Test;
import java.util.Collections;
public class PartialGenericSegmentMergeTaskTest
public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSupervisorTaskTest
{
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation(
ParallelIndexTestingFactory.HOST,
ParallelIndexTestingFactory.PORT,
@ -78,7 +76,7 @@ public class PartialGenericSegmentMergeTaskTest
@Test
public void serializesDeserializes()
{
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
TestHelper.testSerializesDeserializes(getObjectMapper(), target);
}
@Test

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.segment.TestHelper;
import org.hamcrest.Matchers;
@ -29,9 +28,8 @@ import org.junit.Test;
import java.util.Collections;
public class PartialHashSegmentMergeTaskTest
public class PartialHashSegmentMergeTaskTest extends AbstractParallelIndexSupervisorTaskTest
{
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
private static final HashPartitionLocation HASH_PARTITION_LOCATION = new HashPartitionLocation(
ParallelIndexTestingFactory.HOST,
ParallelIndexTestingFactory.PORT,
@ -78,7 +76,7 @@ public class PartialHashSegmentMergeTaskTest
@Test
public void serializesDeserializes()
{
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
TestHelper.testSerializesDeserializes(getObjectMapper(), target);
}
@Test

View File

@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
@ -40,10 +39,8 @@ import org.junit.rules.ExpectedException;
import java.util.Collections;
public class PartialRangeSegmentGenerateTaskTest
public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSupervisorTaskTest
{
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
@Rule
public ExpectedException exception = ExpectedException.none();
@ -95,7 +92,7 @@ public class PartialRangeSegmentGenerateTaskTest
public void serializesDeserializes()
{
PartialRangeSegmentGenerateTask task = new PartialRangeSegmentGenerateTaskBuilder().build();
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, task);
TestHelper.testSerializesDeserializes(getObjectMapper(), task);
}
@Test

View File

@ -24,9 +24,7 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.common.config.NullValueHandlingConfig;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
@ -34,16 +32,10 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.PartitionBoundaries;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.hamcrest.Matchers;
import org.joda.time.Interval;
@ -65,10 +57,8 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -82,6 +72,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
private static final int DIM_FILE_CARDINALITY = 2;
private static final int NUM_PARTITION = 2;
private static final int YEAR = 2017;
private static final Interval INTERVAL_TO_INDEX = Intervals.of("%s-12/P1M", YEAR);
private static final String TIME = "ts";
private static final String DIM1 = "dim1";
private static final String DIM2 = "dim2";
@ -94,11 +85,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2)),
Collections.emptyList(),
Collections.emptyList()
),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))),
LIST_DELIMITER,
Arrays.asList(TIME, DIM1, DIM2, "val"),
false,
@ -142,11 +129,9 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
this.useMultivalueDim = useMultivalueDim;
}
@Override
@Before
public void setup() throws IOException
{
super.setup();
inputDir = temporaryFolder.newFolder("data");
intervalToDims = createInputFiles(inputDir, useMultivalueDim);
}
@ -209,7 +194,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = runTestTask(
PARSE_SPEC,
Intervals.of("%s/%s", YEAR, YEAR + 1),
INTERVAL_TO_INDEX,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
new SingleDimensionPartitionsSpec(
@ -303,218 +288,4 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
actualValues.sort(Comparators.naturalNullsFirst());
Assert.assertEquals(interval.toString(), expectedValues, actualValues);
}
@Override
ParallelIndexSupervisorTask createParallelIndexSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
)
{
return new TestSupervisorTask(id, taskResource, ingestionSchema, context, indexingServiceClient);
}
private static class TestSupervisorTask extends TestParallelIndexSupervisorTask
{
TestSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
)
{
super(id, taskResource, ingestionSchema, context, indexingServiceClient);
}
@Override
PartialDimensionDistributionParallelIndexTaskRunner createPartialDimensionDistributionRunner(TaskToolbox toolbox)
{
return new TestPartialDimensionDistributionRunner(toolbox, this, getIndexingServiceClient());
}
@Override
PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGenerateRunner(
TaskToolbox toolbox,
Map<Interval, PartitionBoundaries> intervalToPartitions
)
{
return new TestPartialRangeSegmentGenerateRunner(
toolbox,
this,
getIndexingServiceClient(),
intervalToPartitions
);
}
@Override
public PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(
TaskToolbox toolbox,
List<PartialGenericSegmentMergeIOConfig> ioConfigs
)
{
return new TestPartialGenericSegmentMergeParallelIndexTaskRunner(
toolbox,
this,
ioConfigs,
getIndexingServiceClient()
);
}
}
private static class TestPartialDimensionDistributionRunner
extends PartialDimensionDistributionParallelIndexTaskRunner
{
private TestPartialDimensionDistributionRunner(
TaskToolbox toolbox,
ParallelIndexSupervisorTask supervisorTask,
IndexingServiceClient indexingServiceClient
)
{
super(
toolbox,
supervisorTask.getId(),
supervisorTask.getGroupId(),
supervisorTask.getIngestionSchema(),
supervisorTask.getContext(),
indexingServiceClient,
new LocalParallelIndexTaskClientFactory(supervisorTask)
);
}
}
private static class TestPartialRangeSegmentGenerateRunner extends PartialRangeSegmentGenerateParallelIndexTaskRunner
{
private TestPartialRangeSegmentGenerateRunner(
TaskToolbox toolbox,
ParallelIndexSupervisorTask supervisorTask,
IndexingServiceClient indexingServiceClient,
Map<Interval, PartitionBoundaries> intervalToPartitions
)
{
super(
toolbox,
supervisorTask.getId(),
supervisorTask.getGroupId(),
supervisorTask.getIngestionSchema(),
supervisorTask.getContext(),
indexingServiceClient,
intervalToPartitions,
new LocalParallelIndexTaskClientFactory(supervisorTask),
new TestAppenderatorsManager()
);
}
}
private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner
extends PartialGenericSegmentMergeParallelIndexTaskRunner
{
private final ParallelIndexSupervisorTask supervisorTask;
private TestPartialGenericSegmentMergeParallelIndexTaskRunner(
TaskToolbox toolbox,
ParallelIndexSupervisorTask supervisorTask,
List<PartialGenericSegmentMergeIOConfig> mergeIOConfigs,
IndexingServiceClient indexingServiceClient
)
{
super(
toolbox,
supervisorTask.getId(),
supervisorTask.getGroupId(),
supervisorTask.getIngestionSchema().getDataSchema(),
mergeIOConfigs,
supervisorTask.getIngestionSchema().getTuningConfig(),
supervisorTask.getContext(),
indexingServiceClient
);
this.supervisorTask = supervisorTask;
}
@Override
SubTaskSpec<PartialGenericSegmentMergeTask> newTaskSpec(PartialGenericSegmentMergeIOConfig ioConfig)
{
final PartialGenericSegmentMergeIngestionSpec ingestionSpec =
new PartialGenericSegmentMergeIngestionSpec(
supervisorTask.getIngestionSchema().getDataSchema(),
ioConfig,
getTuningConfig()
);
return new SubTaskSpec<PartialGenericSegmentMergeTask>(
getTaskId() + "_" + getAndIncrementNextSpecId(),
getGroupId(),
getTaskId(),
getContext(),
new InputSplit<>(ioConfig.getPartitionLocations())
)
{
@Override
public PartialGenericSegmentMergeTask newSubTask(int numAttempts)
{
return new TestPartialGenericSegmentMergeTask(
null,
getGroupId(),
null,
getSupervisorTaskId(),
numAttempts,
ingestionSpec,
getContext(),
getIndexingServiceClient(),
new LocalParallelIndexTaskClientFactory(supervisorTask),
getToolbox()
);
}
};
}
}
private static class TestPartialGenericSegmentMergeTask extends PartialGenericSegmentMergeTask
{
private final TaskToolbox toolbox;
private TestPartialGenericSegmentMergeTask(
@Nullable String id,
String groupId,
TaskResource taskResource,
String supervisorTaskId,
int numAttempts,
PartialGenericSegmentMergeIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient,
IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
TaskToolbox toolbox
)
{
super(
id,
groupId,
taskResource,
supervisorTaskId,
numAttempts,
ingestionSchema,
context,
indexingServiceClient,
taskClientFactory,
null
);
this.toolbox = toolbox;
}
@Override
File fetchSegmentFile(File partitionDir, GenericPartitionLocation location)
{
final File zippedFile = toolbox.getIntermediaryDataManager().findPartitionFile(
getSupervisorTaskId(),
location.getSubTaskId(),
location.getInterval(),
location.getPartitionId()
);
if (zippedFile == null) {
throw new ISE("Can't find segment file for location[%s] at path[%s]", location);
}
return zippedFile;
}
}
}

View File

@ -19,17 +19,15 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
@ -61,7 +59,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@ -80,6 +77,8 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
);
}
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
@ -110,25 +109,21 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
writer.write(StringUtils.format("2017-12-%d,%d th test file\n", 25 + i, i));
}
}
indexingServiceClient = new LocalIndexingServiceClient();
localDeepStorage = temporaryFolder.newFolder("localStorage");
getObjectMapper().registerSubtypes(SettableSplittableLocalInputSource.class);
}
@After
public void teardown()
{
indexingServiceClient.shutdown();
temporaryFolder.delete();
}
@Test
public void testIsReady() throws Exception
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2017/2018"), false, true);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
final ParallelIndexSupervisorTask task = newTask(INTERVAL_TO_INDEX, false, true);
final TaskActionClient actionClient = createActionClient(task);
final TaskToolbox toolbox = createTaskToolbox(task, actionClient);
prepareTaskForLocking(task);
Assert.assertTrue(task.isReady(actionClient));
@ -145,7 +140,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
0,
spec.getIngestionSpec(),
spec.getContext(),
indexingServiceClient,
getIndexingServiceClient(),
null,
new TestAppenderatorsManager()
);
@ -156,26 +151,18 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity, boolean appendToExisting)
throws Exception
{
final ParallelIndexSupervisorTask task = newTask(interval, segmentGranularity, appendToExisting, true);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
shutdownTask(task);
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity) throws Exception
private void runTestTask(@Nullable Interval interval, Granularity segmentGranularity)
{
runTestTask(interval, segmentGranularity, false);
}
private void testRunAndOverwrite(@Nullable Interval inputInterval, Granularity secondSegmentGranularity)
throws Exception
{
// Ingest all data.
runTestTask(inputInterval, Granularities.DAY);
@ -197,62 +184,52 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
@Test
public void testWithoutInterval() throws Exception
public void testWithoutInterval()
{
testRunAndOverwrite(null, Granularities.DAY);
}
@Test()
public void testRunInParallel() throws Exception
public void testRunInParallel()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017/2018"), Granularities.DAY);
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
}
@Test
public void testWithoutIntervalWithDifferentSegmentGranularity() throws Exception
public void testWithoutIntervalWithDifferentSegmentGranularity()
{
testRunAndOverwrite(null, Granularities.MONTH);
}
@Test()
public void testRunInParallelWithDifferentSegmentGranularity() throws Exception
public void testRunInParallelWithDifferentSegmentGranularity()
{
// Ingest all data.
testRunAndOverwrite(Intervals.of("2017/2018"), Granularities.MONTH);
testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.MONTH);
}
@Test
public void testRunInSequential() throws Exception
public void testRunInSequential()
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2017/2018"), false, false);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2017-12/P1M"), false, false);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testPublishEmptySegments() throws Exception
public void testPublishEmptySegments()
{
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2020/2021"), false, true);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
final ParallelIndexSupervisorTask task = newTask(Intervals.of("2020-12/P1M"), false, true);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
}
@Test
public void testWith1MaxNumConcurrentSubTasks() throws Exception
public void testWith1MaxNumConcurrentSubTasks()
{
final ParallelIndexSupervisorTask task = newTask(
Intervals.of("2017/2018"),
Intervals.of("2017-12/P1M"),
Granularities.DAY,
false,
true,
@ -285,20 +262,15 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null
)
);
actionClient = createActionClient(task);
toolbox = createTaskToolbox(task);
prepareTaskForLocking(task);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
Assert.assertTrue(task.isReady(actionClient));
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(task).getStatusCode());
Assert.assertNull("Runner must be null if the task was in the sequential mode", task.getCurrentRunner());
}
@Test
public void testAppendToExisting() throws Exception
public void testAppendToExisting()
{
final Interval interval = Intervals.of("2017/2018");
final Interval interval = Intervals.of("2017-12/P1M");
runTestTask(interval, Granularities.DAY, true);
final Collection<DataSegment> oldSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
@ -365,14 +337,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
),
new ParallelIndexIOConfig(
null,
new LocalInputSource(inputDir, "test_*")
{
@Override
public boolean isSplittable()
{
return splittableInputSource;
}
},
new SettableSplittableLocalInputSource(inputDir, "test_*", splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting
),
@ -409,114 +374,45 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
}
// set up test tools
return new TestSupervisorTask(
return new ParallelIndexSupervisorTask(
null,
null,
null,
ingestionSpec,
new HashMap<>(),
indexingServiceClient
Collections.emptyMap(),
getIndexingServiceClient(),
null,
null,
null,
null
);
}
public static class TestSupervisorTask extends TestParallelIndexSupervisorTask
private static class SettableSplittableLocalInputSource extends LocalInputSource
{
public TestSupervisorTask(
String id,
TaskResource taskResource,
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context,
IndexingServiceClient indexingServiceClient
private final boolean splittableInputSource;
@JsonCreator
private SettableSplittableLocalInputSource(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
@JsonProperty("splittableInputSource") boolean splittableInputSource
)
{
super(id, taskResource, ingestionSchema, context, indexingServiceClient);
super(baseDir, filter);
this.splittableInputSource = splittableInputSource;
}
@JsonProperty
public boolean isSplittableInputSource()
{
return splittableInputSource;
}
@Override
SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox toolbox)
public boolean isSplittable()
{
return new TestSinglePhaseRunner(toolbox, this, getIndexingServiceClient());
}
}
public static class TestSinglePhaseRunner extends TestSinglePhaseParallelIndexTaskRunner
{
private final ParallelIndexSupervisorTask supervisorTask;
TestSinglePhaseRunner(
TaskToolbox toolbox,
ParallelIndexSupervisorTask supervisorTask,
IndexingServiceClient indexingServiceClient
)
{
super(
toolbox,
supervisorTask.getId(),
supervisorTask.getGroupId(),
supervisorTask.getIngestionSchema(),
supervisorTask.getContext(),
indexingServiceClient
);
this.supervisorTask = supervisorTask;
}
@Override
SinglePhaseSubTaskSpec newTaskSpec(InputSplit split)
{
final SplittableInputSource baseInputSource = (SplittableInputSource) getIngestionSchema()
.getIOConfig()
.getNonNullInputSource(getIngestionSchema().getDataSchema().getParser());
return new TestSinglePhaseSubTaskSpec(
supervisorTask.getId() + "_" + getAndIncrementNextSpecId(),
supervisorTask.getGroupId(),
supervisorTask,
new ParallelIndexIngestionSpec(
getIngestionSchema().getDataSchema(),
new ParallelIndexIOConfig(
null,
baseInputSource.withSplit(split),
getIngestionSchema().getIOConfig().getInputFormat(),
getIngestionSchema().getIOConfig().isAppendToExisting()
),
getIngestionSchema().getTuningConfig()
),
supervisorTask.getContext(),
split
);
}
}
public static class TestSinglePhaseSubTaskSpec extends SinglePhaseSubTaskSpec
{
private final ParallelIndexSupervisorTask supervisorTask;
TestSinglePhaseSubTaskSpec(
String id,
String groupId,
ParallelIndexSupervisorTask supervisorTask,
ParallelIndexIngestionSpec ingestionSpec,
Map<String, Object> context,
InputSplit inputSplit
)
{
super(id, groupId, supervisorTask.getId(), ingestionSpec, context, inputSplit);
this.supervisorTask = supervisorTask;
}
@Override
public SinglePhaseSubTask newSubTask(int numAttempts)
{
return new SinglePhaseSubTask(
null,
getGroupId(),
null,
getSupervisorTaskId(),
numAttempts,
getIngestionSpec(),
getContext(),
null,
new LocalParallelIndexTaskClientFactory(supervisorTask),
new TestAppenderatorsManager()
);
return splittableInputSource;
}
}
}

View File

@ -29,7 +29,6 @@ import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
@ -43,8 +42,6 @@ import java.util.Collection;
*/
public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
{
private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class);
private final File baseDir;
private final String filter;
@Nullable
@ -86,7 +83,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
protected Collection<File> initObjects()
{
return FileUtils.listFiles(
Preconditions.checkNotNull(baseDir).getAbsoluteFile(),
Preconditions.checkNotNull(baseDir, "baseDir").getAbsoluteFile(),
new WildcardFileFilter(filter),
TrueFileFilter.INSTANCE
);

View File

@ -124,15 +124,10 @@ public class CliIndexer extends ServerRunnable
binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class);
CliPeon.bindRowIngestionMeters(binder);
CliPeon.bindChatHandler(binder);
CliPeon.bindPeonDataSegmentHandlers(binder);
CliPeon.bindRealtimeCache(binder);
CliPeon.bindCoordinatorHandoffNotiferAndClient(binder);
CliMiddleManager.bindWorkerManagementClasses(binder);
binder.bind(AppenderatorsManager.class)

View File

@ -50,6 +50,7 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.worker.Worker;
@ -113,6 +114,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>() {})
.toProvider(Providers.of(null));
binder.bind(ShuffleClient.class).toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
PolyBind.createChoice(
binder,

View File

@ -63,6 +63,7 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import org.apache.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory;
@ -200,8 +201,8 @@ public class CliOverlord extends ServerRunnable
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>()
{
})
.toProvider(Providers.of(null));
}).toProvider(Providers.of(null));
binder.bind(ShuffleClient.class).toProvider(Providers.of(null));
binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
PolyBind.createChoice(

View File

@ -78,8 +78,10 @@ import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskClientFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
@ -186,11 +188,8 @@ public class CliPeon extends GuiceRunnable
JsonConfigProvider.bind(binder, "druid.task.executor", DruidNode.class, Parent.class);
bindRowIngestionMeters(binder);
bindChatHandler(binder);
bindTaskConfigAndClients(binder);
bindPeonDataSegmentHandlers(binder);
binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
@ -201,18 +200,14 @@ public class CliPeon extends GuiceRunnable
.setStatusFile(new File(taskStatusPath))
);
binder.bind(TaskReportFileWriter.class).toInstance(
new SingleFileTaskReportFileWriter(
new File(taskReportPath)
)
);
binder.bind(TaskReportFileWriter.class)
.toInstance(new SingleFileTaskReportFileWriter(new File(taskReportPath)));
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
bindRealtimeCache(binder);
bindCoordinatorHandoffNotiferAndClient(binder);
binder.bind(AppenderatorsManager.class)
@ -407,6 +402,7 @@ public class CliPeon extends GuiceRunnable
configureTaskActionClient(binder);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(ShuffleClient.class).to(HttpShuffleClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>(){})
.to(ParallelIndexTaskClientFactory.class)