Speed up ParallelIndexSupervisorTask tests (#8633)

Previously, some tests for ParallelIndexSupervisorTask were being run
twice unnecessarily.
This commit is contained in:
Chi Cao Minh 2019-10-08 19:56:12 -07:00 committed by Fangjin Yang
parent 0853273091
commit b6b5517c20
3 changed files with 209 additions and 159 deletions

View File

@ -154,64 +154,6 @@ public class MultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervi
assertHashedPartition(publishedSegments);
}
@Test
public void testMissingIntervals()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(
"forceGuaranteedRollup is set but intervals is missing in granularitySpec"
);
newTask(
null,
new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, "test_*", null),
false
),
new HashedPartitionsSpec(null, 2, null)
);
}
@Test
public void testMissingNumShards()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(
"forceGuaranteedRollup is set but numShards is missing in partitionsSpec"
);
newTask(
Intervals.of("2017/2018"),
Granularities.DAY,
new ParallelIndexIOConfig(new LocalFirehoseFactory(inputDir, "test_*", null), false),
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
new HashedPartitionsSpec(null, null, null),
null,
null,
null,
true,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
);
}
private Set<DataSegment> runTestTask(Interval interval, HashedPartitionsSpec partitionsSpec) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(

View File

@ -19,16 +19,21 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
@ -38,6 +43,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
@ -45,127 +52,224 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ParallelIndexSupervisorTaskSerdeTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
new ArrayList<>(),
new ArrayList<>()
),
null,
Arrays.asList("ts", "dim", "val"),
false,
0
);
private final TestUtils testUtils = new TestUtils();
@Test
public void testSerde() throws IOException
private static ObjectMapper createObjectMapper()
{
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
TestUtils testUtils = new TestUtils();
ObjectMapper objectMapper = testUtils.getTestObjectMapper();
objectMapper.registerSubtypes(
new NamedType(LocalFirehoseFactory.class, "local")
);
final ParallelIndexSupervisorTask task = newTask(
objectMapper,
Intervals.of("2018/2019")
);
final String json = objectMapper.writeValueAsString(task);
Assert.assertEquals(task, objectMapper.readValue(json, Task.class));
return objectMapper;
}
private ParallelIndexSupervisorTask newTask(
ObjectMapper objectMapper,
Interval interval
)
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void serde() throws IOException
{
// set up ingestion spec
final ParallelIndexIngestionSpec ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
objectMapper.convertValue(
new StringInputRowParser(
DEFAULT_PARSE_SPEC,
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularities.DAY,
Granularities.MINUTE,
interval == null ? null : Collections.singletonList(interval)
),
null,
objectMapper
),
new ParallelIndexIOConfig(
new LocalFirehoseFactory(new File("tmp"), "test_*", null),
false
),
new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
2,
null,
null,
null,
null,
null,
null,
null,
null,
null
ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTaskBuilder()
.ingestionSpec(
new ParallelIndexIngestionSpecBuilder()
.inputIntervals(Collections.singletonList(Intervals.of("2018/2019")))
.build()
)
.build();
String json = OBJECT_MAPPER.writeValueAsString(task);
Assert.assertEquals(task, OBJECT_MAPPER.readValue(json, Task.class));
}
@Test
public void forceGuaranteedRollupWithMissingIntervals()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(
"forceGuaranteedRollup is set but intervals is missing in granularitySpec"
);
// set up test tools
return new ParallelIndexSupervisorTask(
"taskId",
new TaskResource("group", 1),
ingestionSpec,
new HashMap<>(),
new NoopIndexingServiceClient(),
new NoopChatHandlerProvider(),
new AuthorizerMapper(Collections.emptyMap()),
new DropwizardRowIngestionMetersFactory(),
new TestAppenderatorsManager()
Integer numShards = 2;
new ParallelIndexSupervisorTaskBuilder()
.ingestionSpec(
new ParallelIndexIngestionSpecBuilder()
.forceGuaranteedRollup(true)
.partitionsSpec(new HashedPartitionsSpec(null, numShards, null))
.build()
)
.build();
}
@Test
public void forceGuaranteedRollupWithMissingNumShards()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(
"forceGuaranteedRollup is set but numShards is missing in partitionsSpec"
);
Integer numShards = null;
new ParallelIndexSupervisorTaskBuilder()
.ingestionSpec(
new ParallelIndexIngestionSpecBuilder()
.forceGuaranteedRollup(true)
.partitionsSpec(new HashedPartitionsSpec(null, numShards, null))
.build()
)
.build();
}
private static class ParallelIndexSupervisorTaskBuilder
{
private static final String ID = "taskId";
private final TaskResource taskResource = new TaskResource("group", 1);
private final Map<String, Object> context = Collections.emptyMap();
private final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
private final ChatHandlerProvider chatHandlerProvider = new NoopChatHandlerProvider();
private final AuthorizerMapper authorizerMapper = new AuthorizerMapper(Collections.emptyMap());
private final RowIngestionMetersFactory rowIngestionMetersFactory = new DropwizardRowIngestionMetersFactory();
private final AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager();
private ParallelIndexIngestionSpec ingestionSpec;
ParallelIndexSupervisorTaskBuilder ingestionSpec(ParallelIndexIngestionSpec ingestionSpec)
{
this.ingestionSpec = ingestionSpec;
return this;
}
ParallelIndexSupervisorTask build()
{
return new ParallelIndexSupervisorTask(
ID,
taskResource,
ingestionSpec,
context,
indexingServiceClient,
chatHandlerProvider,
authorizerMapper,
rowIngestionMetersFactory,
appenderatorsManager
);
}
}
private static class ParallelIndexIngestionSpecBuilder
{
private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
new ArrayList<>(),
new ArrayList<>()
),
null,
Arrays.asList("ts", "dim", "val"),
false,
0
);
private static final TypeReference<Map<String, Object>> PARSER_TYPE = new TypeReference<Map<String, Object>>()
{
};
private final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
new LocalFirehoseFactory(new File("tmp"), "test_*", null),
false
);
// For dataSchema.granularitySpec
@Nullable
private List<Interval> inputIntervals = null;
// For tuningConfig
@Nullable
private Boolean forceGuaranteedRollup = null;
@Nullable
PartitionsSpec partitionsSpec = null;
ParallelIndexIngestionSpecBuilder inputIntervals(List<Interval> inputIntervals)
{
this.inputIntervals = inputIntervals;
return this;
}
@SuppressWarnings("SameParameterValue")
ParallelIndexIngestionSpecBuilder forceGuaranteedRollup(boolean forceGuaranteedRollup)
{
this.forceGuaranteedRollup = forceGuaranteedRollup;
return this;
}
ParallelIndexIngestionSpecBuilder partitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
return this;
}
ParallelIndexIngestionSpec build()
{
DataSchema dataSchema = new DataSchema(
"dataSource",
OBJECT_MAPPER.convertValue(
new StringInputRowParser(PARSE_SPEC, null),
PARSER_TYPE
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, inputIntervals),
null,
OBJECT_MAPPER
);
ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
null,
null,
null,
null,
partitionsSpec,
null,
null,
null,
forceGuaranteedRollup,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);
return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig);
}
}
}

View File

@ -32,12 +32,14 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.utils.CompressionUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
/**
* Firehose that reads data from files on local disk
*/
public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
{
@ -45,6 +47,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
private final File baseDir;
private final String filter;
@Nullable
private final StringInputRowParser parser;
@JsonCreator
@ -52,7 +55,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
// Backwards compatible
@JsonProperty("parser") StringInputRowParser parser
@Nullable @JsonProperty("parser") StringInputRowParser parser
)
{
this.baseDir = baseDir;
@ -73,6 +76,7 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
}
@JsonProperty
@Nullable
public StringInputRowParser getParser()
{
return parser;