Remove ParseSpec.toInputFormat() (#9815)

* Remove toInputFormat() from ParseSpec

* fix test
This commit is contained in:
Jihoon Son 2020-05-05 11:17:57 -07:00 committed by GitHub
parent c6caae9a24
commit 964a1fc9df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 594 additions and 472 deletions

View File

@ -22,7 +22,6 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.CSVParser; import org.apache.druid.java.util.common.parsers.CSVParser;
import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.java.util.common.parsers.Parser;
@ -97,12 +96,6 @@ public class CSVParseSpec extends ParseSpec
return new CSVParser(listDelimiter, columns, hasHeaderRow, skipHeaderRows); return new CSVParser(listDelimiter, columns, hasHeaderRow, skipHeaderRows);
} }
@Override
public InputFormat toInputFormat()
{
return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows);
}
@Override @Override
public ParseSpec withTimestampSpec(TimestampSpec spec) public ParseSpec withTimestampSpec(TimestampSpec spec)
{ {

View File

@ -22,7 +22,6 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.DelimitedParser; import org.apache.druid.java.util.common.parsers.DelimitedParser;
import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.java.util.common.parsers.Parser;
@ -124,12 +123,6 @@ public class DelimitedParseSpec extends ParseSpec
); );
} }
@Override
public InputFormat toInputFormat()
{
return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows);
}
@Override @Override
public ParseSpec withTimestampSpec(TimestampSpec spec) public ParseSpec withTimestampSpec(TimestampSpec spec)
{ {

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser.Feature; import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathParser; import org.apache.druid.java.util.common.parsers.JSONPathParser;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.java.util.common.parsers.Parser;
@ -68,12 +67,6 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
return new JSONPathParser(getFlattenSpec(), objectMapper); return new JSONPathParser(getFlattenSpec(), objectMapper);
} }
@Override
public InputFormat toInputFormat()
{
return new JsonInputFormat(getFlattenSpec(), getFeatureSpec());
}
@Override @Override
public ParseSpec withTimestampSpec(TimestampSpec spec) public ParseSpec withTimestampSpec(TimestampSpec spec)
{ {

View File

@ -23,13 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.java.util.common.parsers.Parser;
import javax.annotation.Nullable;
@Deprecated @Deprecated
@ExtensionPoint @ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "format")
@ -71,16 +68,6 @@ public abstract class ParseSpec
return null; return null;
} }
/**
* Returns null if it's not implemented yet.
* This method (and maybe this class) will be removed in favor of {@link InputFormat} in the future.
*/
@Nullable
public InputFormat toInputFormat()
{
return null;
}
@PublicApi @PublicApi
public ParseSpec withTimestampSpec(TimestampSpec spec) public ParseSpec withTimestampSpec(TimestampSpec spec)
{ {

View File

@ -52,7 +52,7 @@ public class FirehoseFactoryToInputSourceAdaptorTest
} }
final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines); final TestFirehoseFactory firehoseFactory = new TestFirehoseFactory(lines);
final StringInputRowParser inputRowParser = new StringInputRowParser( final StringInputRowParser inputRowParser = new StringInputRowParser(
new UnimplementedInputFormatCsvParseSpec( new CSVParseSpec(
new TimestampSpec(null, "yyyyMMdd", null), new TimestampSpec(null, "yyyyMMdd", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "name", "score"))), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "name", "score"))),
",", ",",
@ -95,28 +95,6 @@ public class FirehoseFactoryToInputSourceAdaptorTest
} }
} }
private static class UnimplementedInputFormatCsvParseSpec extends CSVParseSpec
{
private UnimplementedInputFormatCsvParseSpec(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
String listDelimiter,
List<String> columns,
boolean hasHeaderRow,
int skipHeaderRows
)
{
super(timestampSpec, dimensionsSpec, listDelimiter, columns, hasHeaderRow, skipHeaderRows);
}
@Nullable
@Override
public InputFormat toInputFormat()
{
return null;
}
}
private static class TestFirehoseFactory implements FiniteFirehoseFactory<StringInputRowParser, Object> private static class TestFirehoseFactory implements FiniteFirehoseFactory<StringInputRowParser, Object>
{ {
private final List<String> lines; private final List<String> lines;

View File

@ -194,9 +194,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat( ioConfig.getInputFormat()
spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec()
)
); );
} }

View File

@ -140,9 +140,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat( ioConfig.getInputFormat(),
spec.getDataSchema().getParser() == null ? null : spec.getDataSchema().getParser().getParseSpec()
),
ioConfig.getEndpoint(), ioConfig.getEndpoint(),
ioConfig.getRecordsPerFetch(), ioConfig.getRecordsPerFetch(),
ioConfig.getFetchDelayMillis(), ioConfig.getFetchDelayMillis(),

View File

@ -44,7 +44,6 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.Rows; import org.apache.druid.data.input.Rows;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.IngestionState;
@ -1046,10 +1045,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema) private static InputFormat getInputFormat(IndexIngestionSpec ingestionSchema)
{ {
final InputRowParser parser = ingestionSchema.getDataSchema().getParser(); return ingestionSchema.getIOConfig().getNonNullInputFormat();
return ingestionSchema.getIOConfig().getNonNullInputFormat(
parser == null ? null : parser.getParseSpec()
);
} }
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig> public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
@ -1184,13 +1180,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
} }
} }
public InputFormat getNonNullInputFormat(@Nullable ParseSpec parseSpec) public InputFormat getNonNullInputFormat()
{ {
if (inputFormat == null) { return Preconditions.checkNotNull(inputFormat, "inputFormat");
return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat();
} else {
return inputFormat;
}
} }
@Override @Override

View File

@ -30,7 +30,6 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.TaskStatusPlus;
@ -196,19 +195,8 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
if (lastStatus != null) { if (lastStatus != null) {
LOG.error("Failed because of the failed sub task[%s]", lastStatus.getId()); LOG.error("Failed because of the failed sub task[%s]", lastStatus.getId());
} else { } else {
final SinglePhaseSubTaskSpec spec = final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
(SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec(); LOG.error("Failed to run sub tasks for inputSplits[%s]", spec.getInputSplit());
final InputRowParser inputRowParser = spec.getIngestionSpec().getDataSchema().getParser();
LOG.error(
"Failed to run sub tasks for inputSplits[%s]",
getSplitsIfSplittable(
spec.getIngestionSpec().getIOConfig().getNonNullInputSource(inputRowParser),
spec.getIngestionSpec().getIOConfig().getNonNullInputFormat(
inputRowParser == null ? null : inputRowParser.getParseSpec()
),
tuningConfig.getSplitHintSpec()
)
);
} }
break; break;
default: default:

View File

@ -33,7 +33,6 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec;
@ -1001,10 +1000,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema) static InputFormat getInputFormat(ParallelIndexIngestionSpec ingestionSchema)
{ {
final InputRowParser parser = ingestionSchema.getDataSchema().getParser(); return ingestionSchema.getIOConfig().getNonNullInputFormat();
return ingestionSchema.getIOConfig().getNonNullInputFormat(
parser == null ? null : parser.getParseSpec()
);
} }
/** /**

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.segment.indexing.IOConfig; import org.apache.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -127,8 +126,8 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
} }
@Nullable @Nullable
public InputFormat getInputFormat(ParseSpec parseSpec) public InputFormat getInputFormat()
{ {
return inputFormat == null ? Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat() : inputFormat; return inputFormat;
} }
} }

View File

@ -255,7 +255,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
.map(AggregatorFactory::getName) .map(AggregatorFactory::getName)
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
this.inputFormat = ioConfig.getInputFormat(parser == null ? null : parser.getParseSpec()); this.inputFormat = ioConfig.getInputFormat();
this.parser = parser; this.parser = parser;
this.authorizerMapper = authorizerMapper; this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider; this.chatHandlerProvider = chatHandlerProvider;

View File

@ -98,7 +98,7 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
ioConfig.isUseEarliestSequenceNumber() ioConfig.isUseEarliestSequenceNumber()
); );
inputFormat = Preconditions.checkNotNull( inputFormat = Preconditions.checkNotNull(
ioConfig.getInputFormat(null), ioConfig.getInputFormat(),
"[spec.ioConfig.inputFormat] is required" "[spec.ioConfig.inputFormat] is required"
); );
} }

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -102,22 +101,12 @@ public abstract class SeekableStreamSupervisorIOConfig
} }
@Nullable @Nullable
@JsonProperty("inputFormat") @JsonProperty()
private InputFormat getGivenInputFormat() public InputFormat getInputFormat()
{ {
return inputFormat; return inputFormat;
} }
@Nullable
public InputFormat getInputFormat(@Nullable ParseSpec parseSpec)
{
if (inputFormat == null) {
return Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat();
} else {
return inputFormat;
}
}
@JsonProperty @JsonProperty
public Integer getReplicas() public Integer getReplicas()
{ {

View File

@ -176,6 +176,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
getObjectMapper(), getObjectMapper(),
tmpDir, tmpDir,
CompactionTaskRunTest.DEFAULT_PARSE_SPEC, CompactionTaskRunTest.DEFAULT_PARSE_SPEC,
null,
new UniformGranularitySpec( new UniformGranularitySpec(
Granularities.HOUR, Granularities.HOUR,
Granularities.MINUTE, Granularities.MINUTE,

View File

@ -381,6 +381,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
getObjectMapper(), getObjectMapper(),
tmpDir, tmpDir,
DEFAULT_PARSE_SPEC, DEFAULT_PARSE_SPEC,
null,
new UniformGranularitySpec( new UniformGranularitySpec(
Granularities.HOUR, Granularities.HOUR,
Granularities.MINUTE, Granularities.MINUTE,
@ -791,6 +792,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
getObjectMapper(), getObjectMapper(),
tmpDir, tmpDir,
DEFAULT_PARSE_SPEC, DEFAULT_PARSE_SPEC,
null,
new UniformGranularitySpec( new UniformGranularitySpec(
Granularities.HOUR, Granularities.HOUR,
Granularities.MINUTE, Granularities.MINUTE,

View File

@ -19,10 +19,14 @@
package org.apache.druid.indexing.common.task.batch.parallel; package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec; import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
@ -55,6 +59,7 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -82,8 +87,16 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
this.useInputFormatApi = useInputFormatApi; this.useInputFormatApi = useInputFormatApi;
} }
boolean isUseInputFormatApi()
{
return useInputFormatApi;
}
Set<DataSegment> runTestTask( Set<DataSegment> runTestTask(
ParseSpec parseSpec, @Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable InputFormat inputFormat,
@Nullable ParseSpec parseSpec,
Interval interval, Interval interval,
File inputDir, File inputDir,
String filter, String filter,
@ -93,6 +106,9 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
) )
{ {
final ParallelIndexSupervisorTask task = newTask( final ParallelIndexSupervisorTask task = newTask(
timestampSpec,
dimensionsSpec,
inputFormat,
parseSpec, parseSpec,
interval, interval,
inputDir, inputDir,
@ -108,7 +124,10 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
} }
private ParallelIndexSupervisorTask newTask( private ParallelIndexSupervisorTask newTask(
ParseSpec parseSpec, @Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable InputFormat inputFormat,
@Nullable ParseSpec parseSpec,
Interval interval, Interval interval,
File inputDir, File inputDir,
String filter, String filter,
@ -154,17 +173,18 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
final ParallelIndexIngestionSpec ingestionSpec; final ParallelIndexIngestionSpec ingestionSpec;
if (useInputFormatApi) { if (useInputFormatApi) {
Preconditions.checkArgument(parseSpec == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
null, null,
new LocalInputSource(inputDir, filter), new LocalInputSource(inputDir, filter),
parseSpec.toInputFormat(), inputFormat,
false false
); );
ingestionSpec = new ParallelIndexIngestionSpec( ingestionSpec = new ParallelIndexIngestionSpec(
new DataSchema( new DataSchema(
"dataSource", "dataSource",
parseSpec.getTimestampSpec(), timestampSpec,
parseSpec.getDimensionsSpec(), dimensionsSpec,
new AggregatorFactory[]{ new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val") new LongSumAggregatorFactory("val", "val")
}, },
@ -175,6 +195,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
tuningConfig tuningConfig
); );
} else { } else {
Preconditions.checkArgument(inputFormat == null);
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
new LocalFirehoseFactory(inputDir, filter, null), new LocalFirehoseFactory(inputDir, filter, null),
false false

View File

@ -38,6 +38,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
@ -131,7 +132,13 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
false, false,
0 0
); );
static final InputFormat DEFAULT_INPUT_FORMAT = DEFAULT_PARSE_SPEC.toInputFormat(); static final InputFormat DEFAULT_INPUT_FORMAT = new CsvInputFormat(
Arrays.asList("ts", "dim", "val"),
null,
false,
false,
0
);
static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig( static final ParallelIndexTuningConfig DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING = new ParallelIndexTuningConfig(
null, null,
null, null,

View File

@ -20,7 +20,9 @@
package org.apache.druid.indexing.common.task.batch.parallel; package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
@ -54,18 +56,25 @@ import java.util.Set;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
{ {
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec("ts", "auto", null);
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))
);
private static final ParseSpec PARSE_SPEC = new CSVParseSpec( private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
new TimestampSpec( TIMESTAMP_SPEC,
"ts", DIMENSIONS_SPEC,
"auto",
null
),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2"))),
null, null,
Arrays.asList("ts", "dim1", "dim2", "val"), Arrays.asList("ts", "dim1", "dim2", "val"),
false, false,
0 0
); );
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList("ts", "dim1", "dim2", "val"),
null,
false,
false,
0
);
private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2; private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M"); private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
@ -112,15 +121,34 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
@Test @Test
public void testRun() throws Exception public void testRun() throws Exception
{ {
final Set<DataSegment> publishedSegments = runTestTask( final Set<DataSegment> publishedSegments;
PARSE_SPEC, if (isUseInputFormatApi()) {
INTERVAL_TO_INDEX, publishedSegments = runTestTask(
inputDir, TIMESTAMP_SPEC,
"test_*", DIMENSIONS_SPEC,
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), INPUT_FORMAT,
MAX_NUM_CONCURRENT_SUB_TASKS, null,
TaskState.SUCCESS INTERVAL_TO_INDEX,
); inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
MAX_NUM_CONCURRENT_SUB_TASKS,
TaskState.SUCCESS
);
} else {
publishedSegments = runTestTask(
null,
null,
null,
PARSE_SPEC,
INTERVAL_TO_INDEX,
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
MAX_NUM_CONCURRENT_SUB_TASKS,
TaskState.SUCCESS
);
}
assertHashedPartition(publishedSegments); assertHashedPartition(publishedSegments);
} }

View File

@ -25,7 +25,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap; import com.google.common.collect.SetMultimap;
import org.apache.druid.common.config.NullValueHandlingConfig; import org.apache.druid.common.config.NullValueHandlingConfig;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.impl.TimestampSpec;
@ -79,18 +81,25 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
private static final String LIST_DELIMITER = "|"; private static final String LIST_DELIMITER = "|";
private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2); private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2);
private static final String TEST_FILE_NAME_PREFIX = "test_"; private static final String TEST_FILE_NAME_PREFIX = "test_";
private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(TIME, "auto", null);
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))
);
private static final ParseSpec PARSE_SPEC = new CSVParseSpec( private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
new TimestampSpec( TIMESTAMP_SPEC,
TIME, DIMENSIONS_SPEC,
"auto",
null
),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2))),
LIST_DELIMITER, LIST_DELIMITER,
Arrays.asList(TIME, DIM1, DIM2, "val"), Arrays.asList(TIME, DIM1, DIM2, "val"),
false, false,
0 0
); );
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList(TIME, DIM1, DIM2, "val"),
LIST_DELIMITER,
false,
false,
0
);
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}") @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
public static Iterable<Object[]> constructorFeeder() public static Iterable<Object[]> constructorFeeder()
@ -192,20 +201,44 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
public void createsCorrectRangePartitions() throws Exception public void createsCorrectRangePartitions() throws Exception
{ {
int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = runTestTask( final Set<DataSegment> publishedSegments;
PARSE_SPEC, if (isUseInputFormatApi()) {
INTERVAL_TO_INDEX, publishedSegments = runTestTask(
inputDir, TIMESTAMP_SPEC,
TEST_FILE_NAME_PREFIX + "*", DIMENSIONS_SPEC,
new SingleDimensionPartitionsSpec( INPUT_FORMAT,
targetRowsPerSegment, null,
null, INTERVAL_TO_INDEX,
DIM1, inputDir,
false TEST_FILE_NAME_PREFIX + "*",
), new SingleDimensionPartitionsSpec(
maxNumConcurrentSubTasks, targetRowsPerSegment,
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS null,
); DIM1,
false
),
maxNumConcurrentSubTasks,
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
);
} else {
publishedSegments = runTestTask(
null,
null,
null,
PARSE_SPEC,
INTERVAL_TO_INDEX,
inputDir,
TEST_FILE_NAME_PREFIX + "*",
new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
DIM1,
false
),
maxNumConcurrentSubTasks,
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
);
}
if (!useMultivalueDim) { if (!useMultivalueDim) {
assertRangePartitions(publishedSegments); assertRangePartitions(publishedSegments);

View File

@ -19,8 +19,6 @@
package org.apache.druid.indexing.seekablestream; package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
@ -87,7 +85,6 @@ import org.easymock.EasyMockSupport;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -149,12 +146,12 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
static { static {
OBJECT_MAPPER = new TestUtils().getTestObjectMapper(); OBJECT_MAPPER = new TestUtils().getTestObjectMapper();
OBJECT_MAPPER.registerSubtypes(new NamedType(UnimplementedInputFormatJsonParseSpec.class, "json")); OBJECT_MAPPER.registerSubtypes(new NamedType(JSONParseSpec.class, "json"));
OLD_DATA_SCHEMA = new DataSchema( OLD_DATA_SCHEMA = new DataSchema(
"test_ds", "test_ds",
OBJECT_MAPPER.convertValue( OBJECT_MAPPER.convertValue(
new StringInputRowParser( new StringInputRowParser(
new UnimplementedInputFormatJsonParseSpec( new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null), new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec( new DimensionsSpec(
Arrays.asList( Arrays.asList(
@ -443,25 +440,4 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
return segmentDescriptor; return segmentDescriptor;
} }
} }
private static class UnimplementedInputFormatJsonParseSpec extends JSONParseSpec
{
@JsonCreator
private UnimplementedInputFormatJsonParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("flattenSpec") JSONPathSpec flattenSpec,
@JsonProperty("featureSpec") Map<String, Boolean> featureSpec
)
{
super(timestampSpec, dimensionsSpec, flattenSpec, featureSpec);
}
@Nullable
@Override
public InputFormat toInputFormat()
{
return null;
}
}
} }

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DimensionsSpec;
@ -63,7 +62,12 @@ public class StreamChunkParserTest
public void testWithParserAndNullInputformatParseProperly() throws IOException public void testWithParserAndNullInputformatParseProperly() throws IOException
{ {
final InputRowParser<ByteBuffer> parser = new StringInputRowParser( final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
new NotConvertibleToInputFormatParseSpec(), new JSONParseSpec(
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap()
),
StringUtils.UTF8_STRING StringUtils.UTF8_STRING
); );
final StreamChunkParser chunkParser = new StreamChunkParser( final StreamChunkParser chunkParser = new StreamChunkParser(
@ -109,7 +113,12 @@ public class StreamChunkParserTest
public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException
{ {
final InputRowParser<ByteBuffer> parser = new StringInputRowParser( final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
new NotConvertibleToInputFormatParseSpec(), new JSONParseSpec(
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap()
),
StringUtils.UTF8_STRING StringUtils.UTF8_STRING
); );
final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat( final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat(
@ -138,25 +147,6 @@ public class StreamChunkParserTest
Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met"))); Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met")));
} }
private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec
{
private NotConvertibleToInputFormatParseSpec()
{
super(
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap()
);
}
@Override
public InputFormat toInputFormat()
{
return null;
}
}
private static class TrackingJsonInputFormat extends JsonInputFormat private static class TrackingJsonInputFormat extends JsonInputFormat
{ {
private boolean used; private boolean used;

View File

@ -1027,9 +1027,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
true, true,
minimumMessageTime, minimumMessageTime,
maximumMessageTime, maximumMessageTime,
ioConfig.getInputFormat( ioConfig.getInputFormat()
getDataSchema().getParser() == null ? null : getDataSchema().getParser().getParseSpec()
)
) )
{ {
}; };