>> pair = prepareSegments(
@@ -379,7 +403,14 @@ public class CompactionTask extends AbstractTask
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
- createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+ createIoConfig(
+ toolbox,
+ dataSchema,
+ segmentProvider.interval,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
+ ),
compactionTuningConfig
)
);
@@ -411,7 +442,14 @@ public class CompactionTask extends AbstractTask
specs.add(
new IndexIngestionSpec(
dataSchema,
- createIoConfig(toolbox, dataSchema, interval),
+ createIoConfig(
+ toolbox,
+ dataSchema,
+ interval,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
+ ),
compactionTuningConfig
)
);
@@ -438,7 +476,14 @@ public class CompactionTask extends AbstractTask
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
- createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+ createIoConfig(
+ toolbox,
+ dataSchema,
+ segmentProvider.interval,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
+ ),
compactionTuningConfig
)
);
@@ -446,7 +491,14 @@ public class CompactionTask extends AbstractTask
}
}
- private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
+ private static IndexIOConfig createIoConfig(
+ TaskToolbox toolbox,
+ DataSchema dataSchema,
+ Interval interval,
+ CoordinatorClient coordinatorClient,
+ SegmentLoaderFactory segmentLoaderFactory,
+ RetryPolicyFactory retryPolicyFactory
+ )
{
return new IndexIOConfig(
new IngestSegmentFirehoseFactory(
@@ -456,7 +508,10 @@ public class CompactionTask extends AbstractTask
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
- toolbox.getIndexIO()
+ toolbox.getIndexIO(),
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
),
false
);
@@ -811,7 +866,7 @@ public class CompactionTask extends AbstractTask
* targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment},
* {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together.
* {@link #hasPartitionConfig} checks one of those configs is set.
- *
+ *
* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig
* returns true. If targetCompactionSizeBytes is not set, this returns null or
* {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of
@@ -860,6 +915,9 @@ public class CompactionTask extends AbstractTask
private final AuthorizerMapper authorizerMapper;
private final ChatHandlerProvider chatHandlerProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
+ private final CoordinatorClient coordinatorClient;
+ private final SegmentLoaderFactory segmentLoaderFactory;
+ private final RetryPolicyFactory retryPolicyFactory;
@Nullable
private Interval interval;
@@ -885,7 +943,10 @@ public class CompactionTask extends AbstractTask
ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper,
ChatHandlerProvider chatHandlerProvider,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ CoordinatorClient coordinatorClient,
+ SegmentLoaderFactory segmentLoaderFactory,
+ RetryPolicyFactory retryPolicyFactory
)
{
this.dataSource = dataSource;
@@ -893,6 +954,9 @@ public class CompactionTask extends AbstractTask
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+ this.coordinatorClient = coordinatorClient;
+ this.segmentLoaderFactory = segmentLoaderFactory;
+ this.retryPolicyFactory = retryPolicyFactory;
}
public Builder interval(Interval interval)
@@ -968,7 +1032,10 @@ public class CompactionTask extends AbstractTask
jsonMapper,
authorizerMapper,
chatHandlerProvider,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 6d5a0d8e56a..d0e083a2b6e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
-import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
@@ -84,7 +83,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -419,8 +417,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
- setFirehoseFactoryToolbox(firehoseFactory, toolbox);
-
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
FileUtils.forceMkdir(firehoseTempDir);
@@ -489,25 +485,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
}
- // pass toolbox to any IngestSegmentFirehoseFactory
- private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox)
- {
- if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
- ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
- return;
- }
-
- if (firehoseFactory instanceof CombiningFirehoseFactory) {
- for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) {
- if (delegateFactory instanceof IngestSegmentFirehoseFactory) {
- ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox);
- } else if (delegateFactory instanceof CombiningFirehoseFactory) {
- setFirehoseFactoryToolbox(delegateFactory, toolbox);
- }
- }
- }
- }
-
private Map getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 8004243bb3a..435de05892f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
-import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@@ -185,11 +184,6 @@ public class ParallelIndexSubTask extends AbstractTask
{
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
- if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
- // pass toolbox to Firehose
- ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
- }
-
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
FileUtils.forceMkdir(firehoseTempDir);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 8087582ecb5..bae2946bbdc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -30,16 +30,20 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
+import org.apache.druid.indexing.common.RetryPolicy;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
@@ -48,14 +52,17 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -68,7 +75,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory dimensions;
private final List metrics;
private final IndexIO indexIO;
- private TaskToolbox taskToolbox;
+ private final CoordinatorClient coordinatorClient;
+ private final SegmentLoaderFactory segmentLoaderFactory;
+ private final RetryPolicyFactory retryPolicyFactory;
@JsonCreator
public IngestSegmentFirehoseFactory(
@@ -77,7 +86,10 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory dimensions,
@JsonProperty("metrics") List metrics,
- @JacksonInject IndexIO indexIO
+ @JacksonInject IndexIO indexIO,
+ @JacksonInject CoordinatorClient coordinatorClient,
+ @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+ @JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
@@ -88,6 +100,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory usedSegments = taskToolbox
- .getTaskActionClient()
- .submit(new SegmentListUsedAction(dataSource, interval, null));
- final Map segmentFileMap = taskToolbox.fetchSegments(usedSegments);
+ // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
+ // as TaskActionClient.
+ final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
+ List usedSegments;
+ while (true) {
+ try {
+ usedSegments =
+ coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
+ break;
+ }
+ catch (Throwable e) {
+ log.warn(e, "Exception getting database segments");
+ final Duration delay = retryPolicy.getAndIncrementRetryDelay();
+ if (delay == null) {
+ throw e;
+ } else {
+ final long sleepTime = jitter(delay.getMillis());
+ log.info("Will try again in [%s].", new Duration(sleepTime).toString());
+ try {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException e2) {
+ throw new RuntimeException(e2);
+ }
+ }
+ }
+ }
+
+ final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+ Map segmentFileMap = Maps.newLinkedHashMap();
+ for (DataSegment segment : usedSegments) {
+ segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
+ }
+
final List> timeLineSegments = VersionedIntervalTimeline
.forSegments(usedSegments)
.lookup(interval);
@@ -201,11 +239,18 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory getUniqueDimensions(
List> timelineSegments,
@@ -260,7 +305,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory orderedMetrics = uniqueMetrics.inverse();
return IntStream.range(0, orderedMetrics.size())
- .mapToObj(orderedMetrics::get)
- .collect(Collectors.toList());
+ .mapToObj(orderedMetrics::get)
+ .collect(Collectors.toList());
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index d9acfd6bd83..0966d1b84c9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -76,6 +76,7 @@ public class TaskToolboxTest
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
private ObjectMapper ObjectMapper = new ObjectMapper();
+ private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
private Task task = EasyMock.createMock(Task.class);
private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class);
@@ -107,7 +108,7 @@ public class TaskToolboxTest
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockMonitorScheduler,
- new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
+ mockSegmentLoaderFactory,
ObjectMapper,
mockIndexIO,
mockCache,
@@ -162,13 +163,13 @@ public class TaskToolboxTest
public void testFetchSegments() throws SegmentLoadingException, IOException
{
File expectedFile = temporaryFolder.newFile();
+ EasyMock
+ .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject()))
+ .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
EasyMock
.expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject()))
.andReturn(expectedFile).anyTimes();
- EasyMock
- .expect(mockSegmentLoaderLocalCacheManager.withConfig(EasyMock.anyObject()))
- .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
- EasyMock.replay(mockSegmentLoaderLocalCacheManager);
+ EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager);
DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build();
List segments = ImmutableList.of
(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index b57397ab8cd..b809e7a5ee8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -119,7 +119,6 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -1607,9 +1606,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
- new SegmentLoaderFactory(
- new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
- ),
+ new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index bbf28708ff8..6ad0ec4b146 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
+import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
@@ -52,6 +56,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -96,12 +101,24 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
private RowIngestionMetersFactory rowIngestionMetersFactory;
+ private CoordinatorClient coordinatorClient;
+ private SegmentLoaderFactory segmentLoaderFactory;
private ExecutorService exec;
+ private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
public CompactionTaskRunTest()
{
TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
+ coordinatorClient = new CoordinatorClient(null, null)
+ {
+ @Override
+ public List getDatabaseSegmentDataSourceSegments(String dataSource, List intervals)
+ {
+ return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals);
+ }
+ };
+ segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
}
@Before
@@ -126,7 +143,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
);
final CompactionTask compactionTask = builder
@@ -156,7 +176,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
);
final CompactionTask compactionTask1 = builder
@@ -200,7 +223,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
);
final CompactionTask compactionTask1 = builder
@@ -248,7 +274,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
getObjectMapper(),
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ coordinatorClient,
+ segmentLoaderFactory,
+ retryPolicyFactory
);
// day segmentGranularity
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index f759cc7c1c6..5117c1a31c8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -41,6 +42,9 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
@@ -153,12 +157,15 @@ public class CompactionTaskTest
private static List AGGREGATORS;
private static List SEGMENTS;
private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+ private static Map segmentMap = new HashMap<>();
+ private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap);
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
- private static Map segmentMap;
+ private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
private final boolean keepSegmentGranularity;
private TaskToolbox toolbox;
+ private SegmentLoaderFactory segmentLoaderFactory;
@BeforeClass
public static void setupClass()
@@ -202,7 +209,6 @@ public class CompactionTaskTest
AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
- segmentMap = new HashMap<>(SEGMENT_INTERVALS.size());
for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
segmentMap.put(
@@ -243,6 +249,8 @@ public class CompactionTaskTest
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
+ binder.bind(CoordinatorClient.class).toInstance(coordinatorClient);
+ binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
}
)
)
@@ -307,19 +315,21 @@ public class CompactionTaskTest
@Before
public void setup()
{
+ final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap);
toolbox = new TestTaskToolbox(
new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
- new TestIndexIO(objectMapper, segmentMap),
+ testIndexIO,
segmentMap
);
+ segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper);
}
@Parameters(name = "keepSegmentGranularity={0}")
public static Collection