Remove Injector from IngestSegmentFirehoseFactory (#5045)

* Add lock check to segment list actions

* fix test

* remove lock check
This commit is contained in:
Jihoon Son 2017-11-21 08:54:35 +09:00 committed by Gian Merlino
parent dbb37b727d
commit 645de02fb2
20 changed files with 302 additions and 135 deletions

View File

@ -25,16 +25,20 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.java.util.common.ISE;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.TreeMap;
public class TaskActionPreconditions
{
public static void checkLockCoversSegments(
static void checkLockCoversSegments(
final Task task,
final TaskLockbox taskLockbox,
final Set<DataSegment> segments
final Collection<DataSegment> segments
)
{
if (!isLockCoversSegments(task, taskLockbox, segments)) {
@ -46,7 +50,7 @@ public class TaskActionPreconditions
static boolean isLockCoversSegments(
final Task task,
final TaskLockbox taskLockbox,
final Set<DataSegment> segments
final Collection<DataSegment> segments
)
{
// Verify that each of these segments falls under some lock
@ -55,19 +59,31 @@ public class TaskActionPreconditions
// NOTE: it and before we perform the segment insert, but, that should be OK since the worst that happens is we
// NOTE: insert some segments from the task but not others.
final List<TaskLock> taskLocks = taskLockbox.findLocksForTask(task);
for (final DataSegment segment : segments) {
final boolean ok = taskLocks.stream().anyMatch(
taskLock -> taskLock.getDataSource().equals(segment.getDataSource())
&& taskLock.getInterval().contains(segment.getInterval())
&& taskLock.getVersion().compareTo(segment.getVersion()) >= 0
);
if (!ok) {
return false;
}
final NavigableMap<DateTime, TaskLock> taskLockMap = getTaskLockMap(taskLockbox, task);
if (taskLockMap.isEmpty()) {
return false;
}
return true;
return segments.stream().allMatch(
segment -> {
final Entry<DateTime, TaskLock> entry = taskLockMap.floorEntry(segment.getInterval().getStart());
if (entry == null) {
return false;
}
final TaskLock taskLock = entry.getValue();
return taskLock.getInterval().contains(segment.getInterval()) &&
taskLock.getDataSource().equals(segment.getDataSource()) &&
taskLock.getVersion().compareTo(segment.getVersion()) >= 0;
}
);
}
private static NavigableMap<DateTime, TaskLock> getTaskLockMap(TaskLockbox taskLockbox, Task task)
{
final List<TaskLock> taskLocks = taskLockbox.findLocksForTask(task);
final NavigableMap<DateTime, TaskLock> taskLockMap = new TreeMap<>();
taskLocks.forEach(taskLock -> taskLockMap.put(taskLock.getInterval().getStart(), taskLock));
return taskLockMap;
}
}

View File

@ -30,7 +30,6 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.data.input.impl.DimensionsSpec;
@ -96,7 +95,6 @@ public class CompactionTask extends AbstractTask
private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec;
private final IndexTuningConfig tuningConfig;
private final Injector injector;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final SegmentProvider segmentProvider;
@ -114,7 +112,6 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject Injector injector,
@JacksonInject ObjectMapper jsonMapper
)
{
@ -126,7 +123,6 @@ public class CompactionTask extends AbstractTask
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
this.tuningConfig = tuningConfig;
this.injector = injector;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
}
@ -190,7 +186,6 @@ public class CompactionTask extends AbstractTask
segmentProvider,
dimensionsSpec,
tuningConfig,
injector,
jsonMapper
);
@ -220,7 +215,6 @@ public class CompactionTask extends AbstractTask
SegmentProvider segmentProvider,
DimensionsSpec dimensionsSpec,
IndexTuningConfig tuningConfig,
Injector injector,
ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException
{
@ -254,7 +248,6 @@ public class CompactionTask extends AbstractTask
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
injector,
toolbox.getIndexIO()
),
false
@ -484,7 +477,7 @@ public class CompactionTask extends AbstractTask
Collections.sort(segments);
Preconditions.checkState(
usedSegments.equals(segments),
"Specified segments[%s] are different from the currently used segments[%s]",
"Specified segments[%s] are different from the current used segments[%s]",
segments,
usedSegments
);

View File

@ -68,6 +68,7 @@ public class NoopTask extends AbstractTask
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@ -77,7 +78,7 @@ public class NoopTask extends AbstractTask
{
super(
id == null ? StringUtils.format("noop_%s_%s", DateTimes.nowUtc(), UUID.randomUUID().toString()) : id,
"none",
dataSource == null ? "none" : dataSource,
context
);
@ -152,18 +153,24 @@ public class NoopTask extends AbstractTask
public static NoopTask create()
{
return new NoopTask(null, 0, 0, null, null, null);
return new NoopTask(null, null, 0, 0, null, null, null);
}
@VisibleForTesting
public static NoopTask create(String dataSource)
{
return new NoopTask(null, dataSource, 0, 0, null, null, null);
}
@VisibleForTesting
public static NoopTask create(int priority)
{
return new NoopTask(null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
return new NoopTask(null, null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
}
@VisibleForTesting
public static NoopTask create(String id, int priority)
{
return new NoopTask(id, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
return new NoopTask(id, null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
}
}

View File

@ -30,23 +30,20 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.task.NoopTask;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import io.druid.segment.transform.TransformSpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -70,7 +67,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
private final DimFilter dimFilter;
private final List<String> dimensions;
private final List<String> metrics;
private final Injector injector;
private final IndexIO indexIO;
private TaskToolbox taskToolbox;
@ -81,7 +77,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics,
@JacksonInject Injector injector,
@JacksonInject IndexIO indexIO
)
{
@ -92,7 +87,6 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
this.dimFilter = dimFilter;
this.dimensions = dimensions;
this.metrics = metrics;
this.injector = injector;
this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
}
@ -136,12 +130,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
{
log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval);
if (taskToolbox == null) {
// Noop Task is just used to create the toolbox and list segments.
taskToolbox = injector.getInstance(TaskToolboxFactory.class).build(
new NoopTask("reingest", 0, 0, null, null, null)
);
}
Preconditions.checkNotNull(taskToolbox, "taskToolbox is not set");
try {
final List<DataSegment> usedSegments = taskToolbox

View File

@ -94,7 +94,7 @@ public class RemoteTaskActionClientTest
replay(druidLeaderClient);
Task task = new NoopTask("id", 0, 0, null, null, null);
Task task = new NoopTask("id", null, 0, 0, null, null, null);
RemoteTaskActionClient client = new RemoteTaskActionClient(
task, druidLeaderClient, new RetryPolicyFactory(
new RetryPolicyConfig()
@ -134,7 +134,7 @@ public class RemoteTaskActionClientTest
replay(druidLeaderClient);
Task task = new NoopTask("id", 0, 0, null, null, null);
Task task = new NoopTask("id", null, 0, 0, null, null, null);
RemoteTaskActionClient client = new RemoteTaskActionClient(
task, druidLeaderClient, new RetryPolicyFactory(
objectMapper.readValue("{\"maxRetryCount\":0}", RetryPolicyConfig.class)

View File

@ -107,7 +107,7 @@ public class SegmentAllocateActionTest
@Test
public void testManySegmentsSameInterval() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
@ -182,7 +182,7 @@ public class SegmentAllocateActionTest
@Test
public void testResumeSequence() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
@ -306,7 +306,7 @@ public class SegmentAllocateActionTest
@Test
public void testMultipleSequences() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
@ -419,7 +419,7 @@ public class SegmentAllocateActionTest
@Test
public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
@ -480,7 +480,7 @@ public class SegmentAllocateActionTest
@Test
public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
@ -541,7 +541,7 @@ public class SegmentAllocateActionTest
@Test
public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
@ -578,7 +578,7 @@ public class SegmentAllocateActionTest
@Test
public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
@ -615,7 +615,7 @@ public class SegmentAllocateActionTest
@Test
public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
@ -644,7 +644,7 @@ public class SegmentAllocateActionTest
@Test
public void testCannotDoAnythingWithSillyQueryGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, Granularities.DAY, Granularities.HOUR, "s1", null);
@ -655,7 +655,7 @@ public class SegmentAllocateActionTest
@Test
public void testCannotAddToExistingSingleDimensionShardSpecs() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(

View File

@ -91,7 +91,7 @@ public class SegmentInsertActionTest
@Test
public void testSimple() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
@ -121,7 +121,7 @@ public class SegmentInsertActionTest
@Test
public void testFailBadVersion() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

View File

@ -0,0 +1,115 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.Intervals;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
public class SegmentListActionsTest
{
private static final Interval INTERVAL = Intervals.of("2017-10-01/2017-10-15");
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
private Task task;
private Set<DataSegment> expectedUnusedSegments;
private Set<DataSegment> expectedUsedSegments;
@Before
public void setup() throws IOException
{
task = NoopTask.create();
actionTestKit.getTaskLockbox().add(task);
expectedUnusedSegments = new HashSet<>();
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-05/2017-10-06"), "1"));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), "1"));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "1"));
actionTestKit.getMetadataStorageCoordinator()
.announceHistoricalSegments(expectedUnusedSegments);
expectedUnusedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
expectedUsedSegments = new HashSet<>();
expectedUsedSegments.add(createSegment(Intervals.of("2017-10-05/2017-10-06"), "2"));
expectedUsedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), "2"));
expectedUsedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "2"));
actionTestKit.getMetadataStorageCoordinator()
.announceHistoricalSegments(expectedUsedSegments);
expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
expectedUnusedSegments.forEach(
s -> actionTestKit.getMetadataSegmentManager().removeSegment(task.getDataSource(), s.getIdentifier())
);
}
private DataSegment createSegment(Interval interval, String version)
{
return new DataSegment(
task.getDataSource(),
interval,
version,
null,
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
NoneShardSpec.instance(),
Integer.valueOf(version),
1
);
}
@Test
public void testSegmentListUsedAction() throws IOException, InterruptedException
{
final SegmentListUsedAction action = new SegmentListUsedAction(
task.getDataSource(),
null,
ImmutableList.of(INTERVAL)
);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUsedSegments, resultSegments);
}
@Test
public void testSegmentListUnusedAction() throws InterruptedException, IOException
{
final SegmentListUnusedAction action = new SegmentListUnusedAction(task.getDataSource(), INTERVAL);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
}

View File

@ -31,7 +31,7 @@ import java.util.List;
/**
*/
public class SegmentListUsedActionTest
public class SegmentListUsedActionSerdeTest
{
private static final ObjectMapper MAPPER = TestUtil.MAPPER;

View File

@ -89,7 +89,7 @@ public class SegmentTransactionalInsertActionTest
@Test
public void testTransactional() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
@ -130,7 +130,7 @@ public class SegmentTransactionalInsertActionTest
@Test
public void testFailTransactional() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
@ -149,7 +149,7 @@ public class SegmentTransactionalInsertActionTest
@Test
public void testFailBadVersion() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final Task task = new NoopTask(null, null, 0, 0, null, null, null);
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

View File

@ -101,4 +101,50 @@ public class TaskActionPreconditionsTest
Assert.assertEquals(3, locks.size());
Assert.assertTrue(TaskActionPreconditions.isLockCoversSegments(task, lockbox, segments));
}
@Test
public void testCheckLargeLockCoversSegments() throws Exception
{
final List<Interval> intervals = ImmutableList.of(
Intervals.of("2017-01-01/2017-01-04")
);
final Map<Interval, TaskLock> locks = intervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
)
);
Assert.assertEquals(1, locks.size());
Assert.assertTrue(TaskActionPreconditions.isLockCoversSegments(task, lockbox, segments));
}
@Test
public void testCheckLockCoversSegmentsWithOverlappedIntervals() throws Exception
{
final List<Interval> lockIntervals = ImmutableList.of(
Intervals.of("2016-12-31/2017-01-01"),
Intervals.of("2017-01-01/2017-01-02"),
Intervals.of("2017-01-02/2017-01-03")
);
final Map<Interval, TaskLock> locks = lockIntervals.stream().collect(
Collectors.toMap(
Function.identity(),
interval -> {
final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, interval).getTaskLock();
Assert.assertNotNull(lock);
return lock;
}
)
);
Assert.assertEquals(3, locks.size());
Assert.assertFalse(TaskActionPreconditions.isLockCoversSegments(task, lockbox, segments));
}
}

View File

@ -19,6 +19,7 @@
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.config.TaskStorageConfig;
@ -28,8 +29,11 @@ import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
@ -44,6 +48,7 @@ public class TaskActionTestKit extends ExternalResource
private TaskLockbox taskLockbox;
private TestDerbyConnector testDerbyConnector;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private MetadataSegmentManager metadataSegmentManager;
private TaskActionToolbox taskActionToolbox;
public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
@ -71,6 +76,11 @@ public class TaskActionTestKit extends ExternalResource
return metadataStorageCoordinator;
}
public MetadataSegmentManager getMetadataSegmentManager()
{
return metadataSegmentManager;
}
public TaskActionToolbox getTaskActionToolbox()
{
return taskActionToolbox;
@ -85,11 +95,18 @@ public class TaskActionTestKit extends ExternalResource
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig)
);
final ObjectMapper objectMapper = new TestUtils().getTestObjectMapper();
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new TestUtils().getTestObjectMapper(),
objectMapper,
metadataStorageTablesConfig,
testDerbyConnector
);
metadataSegmentManager = new SQLMetadataSegmentManager(
objectMapper,
Suppliers.ofInstance(new MetadataSegmentManagerConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector
);
taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
@ -113,6 +130,7 @@ public class TaskActionTestKit extends ExternalResource
taskLockbox = null;
testDerbyConnector = null;
metadataStorageCoordinator = null;
metadataSegmentManager = null;
taskActionToolbox = null;
}
}

View File

@ -27,7 +27,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
@ -120,7 +119,6 @@ public class CompactionTaskTest
new DoubleDimensionSchema(MIXED_TYPE_COLUMN)
);
private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
private static final Injector INJECTOR = GuiceInjectors.makeStartupInjector();
private static Map<String, DimensionSchema> DIMENSIONS;
private static Map<String, AggregatorFactory> AGGREGATORS;
@ -200,7 +198,7 @@ public class CompactionTaskTest
guiceIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
objectMapper.setInjectableValues(new GuiceInjectableValues(INJECTOR));
objectMapper.setInjectableValues(new GuiceInjectableValues(GuiceInjectors.makeStartupInjector()));
objectMapper.registerModule(
new SimpleModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))
);
@ -261,7 +259,6 @@ public class CompactionTaskTest
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
INJECTOR,
objectMapper
);
final byte[] bytes = objectMapper.writeValueAsBytes(task);
@ -288,7 +285,6 @@ public class CompactionTaskTest
null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
INJECTOR,
objectMapper
);
final byte[] bytes = objectMapper.writeValueAsBytes(task);
@ -310,7 +306,6 @@ public class CompactionTaskTest
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
null,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -355,7 +350,6 @@ public class CompactionTaskTest
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
customSpec,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
@ -370,7 +364,6 @@ public class CompactionTaskTest
new SegmentProvider(SEGMENTS),
null,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@ -382,7 +375,7 @@ public class CompactionTaskTest
public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOException, SegmentLoadingException
{
expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class));
expectedException.expectMessage(CoreMatchers.containsString("are different from the currently used segments"));
expectedException.expectMessage(CoreMatchers.containsString("are different from the current used segments"));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
segments.remove(0);
@ -391,7 +384,6 @@ public class CompactionTaskTest
new SegmentProvider(segments),
null,
TUNING_CONFIG,
INJECTOR,
objectMapper
);
}

View File

@ -30,7 +30,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Module;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.data.input.InputRow;
@ -51,8 +50,11 @@ import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.Intervals;
@ -71,8 +73,6 @@ import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -85,6 +85,8 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.transform.TransformSpec;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
@ -124,24 +126,30 @@ public class IngestSegmentFirehoseFactoryTest
private static final ObjectMapper MAPPER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
private static final TaskStorage TASK_STORAGE;
private static final TaskLockbox TASK_LOCKBOX;
private static final Task TASK;
static {
TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(TestHelper.getJsonMapper());
INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
INDEX_IO = testUtils.getTestIndexIO();
}
@Parameterized.Parameters(name = "{1}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final IndexSpec indexSpec = new IndexSpec();
final HeapMemoryTaskStorage ts = new HeapMemoryTaskStorage(
TASK_STORAGE = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
{
}
);
TASK_LOCKBOX = new TaskLockbox(TASK_STORAGE);
TASK = NoopTask.create();
TASK_LOCKBOX.add(TASK);
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final IndexSpec indexSpec = new IndexSpec();
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(JodaUtils.MIN_INSTANT)
.withDimensionsSpec(ROW_PARSER)
@ -164,7 +172,6 @@ public class IngestSegmentFirehoseFactoryTest
}
INDEX_MERGER_V9.persist(index, persistDir, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
{
final private Set<DataSegment> published = Sets.newHashSet();
@ -208,8 +215,8 @@ public class IngestSegmentFirehoseFactoryTest
}
};
final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory(
ts,
new TaskActionToolbox(tl, mdc, newMockEmitter(), EasyMock.createMock(SupervisorManager.class))
TASK_STORAGE,
new TaskActionToolbox(TASK_LOCKBOX, mdc, newMockEmitter(), EasyMock.createMock(SupervisorManager.class))
);
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);
@ -330,32 +337,24 @@ public class IngestSegmentFirehoseFactoryTest
null,
ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME)
)) {
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
TASK.getDataSource(),
Intervals.ETERNITY,
new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
dim_names,
metric_names,
INDEX_IO
);
factory.setTaskToolbox(taskToolboxFactory.build(TASK));
values.add(
new Object[]{
new IngestSegmentFirehoseFactory(
DATA_SOURCE_NAME,
Intervals.ETERNITY,
new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
dim_names,
metric_names,
Guice.createInjector(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskToolboxFactory.class).toInstance(taskToolboxFactory);
}
}
),
INDEX_IO
),
StringUtils.format(
"DimNames[%s]MetricNames[%s]ParserDimNames[%s]",
dim_names == null ? "null" : "dims",
metric_names == null ? "null" : "metrics",
parser == ROW_PARSER ? "dims" : "null"
),
factory,
parser
}
);
@ -400,8 +399,8 @@ public class IngestSegmentFirehoseFactoryTest
}
public IngestSegmentFirehoseFactoryTest(
IngestSegmentFirehoseFactory factory,
String testName,
IngestSegmentFirehoseFactory factory,
InputRowParser rowParser
)
{
@ -478,7 +477,7 @@ public class IngestSegmentFirehoseFactoryTest
}
@BeforeClass
public static void setUpStatic() throws IOException
public static void setUpStatic() throws IOException, InterruptedException
{
for (int i = 0; i < MAX_SHARD_NUMBER; ++i) {
segmentSet.add(buildSegment(i));
@ -512,7 +511,7 @@ public class IngestSegmentFirehoseFactoryTest
@Test
public void sanityTest()
{
Assert.assertEquals(DATA_SOURCE_NAME, factory.getDataSource());
Assert.assertEquals(TASK.getDataSource(), factory.getDataSource());
if (factory.getDimensions() != null) {
Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray());
}

View File

@ -27,10 +27,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
@ -40,13 +36,17 @@ import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
@ -59,11 +59,11 @@ import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.segment.transform.TransformSpec;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
@ -294,6 +294,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
} else {
throw new IllegalArgumentException("WTF");
}
} else if (taskAction instanceof LockAcquireAction) {
return (RetType) new TaskLock(TaskLockType.EXCLUSIVE, null, DATA_SOURCE, Intervals.of("2000/2001"), "v1", 0);
} else {
throw new UnsupportedOperationException();
}
@ -343,25 +345,15 @@ public class IngestSegmentFirehoseFactoryTimelineTest
null,
null
);
final Injector injector = Guice.createInjector(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskToolboxFactory.class).toInstance(taskToolboxFactory);
}
}
);
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
DATA_SOURCE,
testCase.interval,
new NoopDimFilter(),
Arrays.asList(DIMENSIONS),
Arrays.asList(METRICS),
injector,
INDEX_IO
);
factory.setTaskToolbox(taskToolboxFactory.build(NoopTask.create(DATA_SOURCE)));
constructors.add(
new Object[]{

View File

@ -224,7 +224,7 @@ public class OverlordTest
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
final String taskId_0 = "0";
NoopTask task_0 = new NoopTask(taskId_0, 0, 0, null, null, null);
NoopTask task_0 = new NoopTask(taskId_0, null, 0, 0, null, null, null);
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
@ -257,7 +257,7 @@ public class OverlordTest
// Manually insert task in taskStorage
// Verifies sync from storage
final String taskId_1 = "1";
NoopTask task_1 = new NoopTask(taskId_1, 0, 0, null, null, null);
NoopTask task_1 = new NoopTask(taskId_1, null, 0, 0, null, null, null);
taskStorage.insert(task_1, TaskStatus.running(taskId_1));
// Wait for task runner to run task_1
runTaskCountDownLatches[Integer.parseInt(taskId_1)].await();

View File

@ -78,7 +78,7 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
private final String requestMethod;
private final ResourceFilter resourceFilter;
private final Injector injector;
private final Task noopTask = new NoopTask(null, 0, 0, null, null, null);
private final Task noopTask = new NoopTask(null, null, 0, 0, null, null, null);
private static boolean mockedOnceTsqa;
private static boolean mockedOnceSM;

View File

@ -41,7 +41,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost1", "localhost2", "localhost3")), false)
);
NoopTask noopTask = new NoopTask(null, 1, 0, null, null, null)
NoopTask noopTask = new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -111,7 +111,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
);
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -134,7 +134,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
);
Assert.assertNull(worker);
}

View File

@ -87,7 +87,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -122,7 +122,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -158,7 +158,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -194,7 +194,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -280,7 +280,7 @@ public class EqualDistributionWorkerSelectStrategyTest
private static NoopTask createDummyTask(final String dataSource)
{
return new NoopTask(null, 1, 0, null, null, null)
return new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()

View File

@ -57,7 +57,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
@ -94,7 +94,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
);
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -117,7 +117,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, 1, 0, null, null, null)
new NoopTask(null, null, 1, 0, null, null, null)
);
Assert.assertNull(worker);
}