mirror of https://github.com/apache/druid.git
allow arbitrary aggregators for reindexing with hadoop (#5294)
This commit is contained in:
parent
b084075279
commit
8fae0edc95
|
@ -37,13 +37,14 @@ There are other types of `inputSpec` to enable reindexing and delta ingestion.
|
|||
|
||||
#### `dataSource`
|
||||
|
||||
This is a type of `inputSpec` that reads data already stored inside Druid.
|
||||
This is a type of `inputSpec` that reads data already stored inside Druid. This is used to allow "re-indexing" data and for "delta-ingestion" described later in `multi` type inputSpec.
|
||||
|
||||
|Field|Type|Description|Required|
|
||||
|-----|----|-----------|--------|
|
||||
|type|String.|This should always be 'dataSource'.|yes|
|
||||
|ingestionSpec|JSON object.|Specification of Druid segments to be loaded. See below.|yes|
|
||||
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. With -1, druid calculates max split size based on user specified number of map task(mapred.map.tasks or mapreduce.job.maps). By default, one split is made for one segment. |no|
|
||||
|useNewAggs|Boolean|If "false", then list of aggregators in "metricsSpec" of hadoop indexing task must be same as that used in original indexing task while ingesting raw data. Default value is "false". This field can be set to "true" when "inputSpec" type is "dataSource" and not "multi" to enable arbitrary aggregators while reindexing. See below for "multi" type support for delta-ingestion.|no|
|
||||
|
||||
Here is what goes inside `ingestionSpec`:
|
||||
|
||||
|
@ -76,6 +77,7 @@ For example
|
|||
#### `multi`
|
||||
|
||||
This is a composing inputSpec to combine other inputSpecs. This inputSpec is used for delta ingestion. Please note that you can have only one `dataSource` as child of `multi` inputSpec.
|
||||
Note that, "useNewAggs" must be set to default value false to support delta-ingestion.
|
||||
|
||||
|Field|Type|Description|Required|
|
||||
|-----|----|-----------|--------|
|
||||
|
|
|
@ -37,6 +37,7 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.Rows;
|
||||
import io.druid.indexer.hadoop.SegmentInputRow;
|
||||
import io.druid.indexer.path.DatasourcePathSpec;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
|
@ -265,7 +266,8 @@ public class IndexGeneratorJob implements Jobby
|
|||
private static final HashFunction hashFunction = Hashing.murmur3_128();
|
||||
|
||||
private AggregatorFactory[] aggregators;
|
||||
private AggregatorFactory[] combiningAggs;
|
||||
|
||||
private AggregatorFactory[] aggsForSerializingSegmentInputRow;
|
||||
private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;
|
||||
|
||||
@Override
|
||||
|
@ -274,9 +276,16 @@ public class IndexGeneratorJob implements Jobby
|
|||
{
|
||||
super.setup(context);
|
||||
aggregators = config.getSchema().getDataSchema().getAggregators();
|
||||
combiningAggs = new AggregatorFactory[aggregators.length];
|
||||
for (int i = 0; i < aggregators.length; ++i) {
|
||||
combiningAggs[i] = aggregators[i].getCombiningFactory();
|
||||
|
||||
if (DatasourcePathSpec.checkIfReindexingAndIsUseAggEnabled(config.getSchema().getIOConfig().getPathSpec())) {
|
||||
aggsForSerializingSegmentInputRow = aggregators;
|
||||
} else {
|
||||
// Note: this is required for "delta-ingestion" use case where we are reading rows stored in Druid as well
|
||||
// as late arriving data on HDFS etc.
|
||||
aggsForSerializingSegmentInputRow = new AggregatorFactory[aggregators.length];
|
||||
for (int i = 0; i < aggregators.length; ++i) {
|
||||
aggsForSerializingSegmentInputRow[i] = aggregators[i].getCombiningFactory();
|
||||
}
|
||||
}
|
||||
typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema()
|
||||
.getDataSchema()
|
||||
|
@ -313,7 +322,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
|
||||
// data
|
||||
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
|
||||
InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, reportParseExceptions)
|
||||
InputRowSerde.toBytes(typeHelperMap, inputRow, aggsForSerializingSegmentInputRow, reportParseExceptions)
|
||||
:
|
||||
InputRowSerde.toBytes(typeHelperMap, inputRow, aggregators, reportParseExceptions);
|
||||
|
||||
|
|
|
@ -41,23 +41,36 @@ import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class DatasourcePathSpec implements PathSpec
|
||||
{
|
||||
private static final Logger logger = new Logger(DatasourcePathSpec.class);
|
||||
|
||||
public static final String TYPE = "dataSource";
|
||||
|
||||
private final ObjectMapper mapper;
|
||||
private final DatasourceIngestionSpec ingestionSpec;
|
||||
private final long maxSplitSize;
|
||||
private final List<WindowedDataSegment> segments;
|
||||
|
||||
/*
|
||||
Note: User would set this flag when they are doing pure re-indexing and would like to have a different
|
||||
set of aggregators than the ones used during original indexing.
|
||||
Default behavior is to expect same aggregators as used in original data ingestion job to support delta-ingestion
|
||||
use case.
|
||||
*/
|
||||
private final boolean useNewAggs;
|
||||
private static final String USE_NEW_AGGS_KEY = "useNewAggs";
|
||||
|
||||
@JsonCreator
|
||||
public DatasourcePathSpec(
|
||||
@JacksonInject ObjectMapper mapper,
|
||||
@JsonProperty("segments") List<WindowedDataSegment> segments,
|
||||
@JsonProperty("ingestionSpec") DatasourceIngestionSpec spec,
|
||||
@JsonProperty("maxSplitSize") Long maxSplitSize
|
||||
@JsonProperty("maxSplitSize") Long maxSplitSize,
|
||||
@JsonProperty(USE_NEW_AGGS_KEY) boolean useNewAggs
|
||||
)
|
||||
{
|
||||
this.mapper = Preconditions.checkNotNull(mapper, "null mapper");
|
||||
|
@ -69,6 +82,14 @@ public class DatasourcePathSpec implements PathSpec
|
|||
} else {
|
||||
this.maxSplitSize = maxSplitSize.longValue();
|
||||
}
|
||||
|
||||
this.useNewAggs = useNewAggs;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isUseNewAggs()
|
||||
{
|
||||
return useNewAggs;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -148,9 +169,16 @@ public class DatasourcePathSpec implements PathSpec
|
|||
Set<String> metrics = Sets.newHashSet();
|
||||
final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators();
|
||||
if (cols != null) {
|
||||
for (AggregatorFactory col : cols) {
|
||||
metrics.add(col.getName());
|
||||
if (useNewAggs) {
|
||||
for (AggregatorFactory col : cols) {
|
||||
metrics.addAll(col.requiredFields());
|
||||
}
|
||||
} else {
|
||||
for (AggregatorFactory col : cols) {
|
||||
metrics.add(col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
updatedIngestionSpec = updatedIngestionSpec.withMetrics(Lists.newArrayList(metrics));
|
||||
}
|
||||
|
@ -170,6 +198,14 @@ public class DatasourcePathSpec implements PathSpec
|
|||
return job;
|
||||
}
|
||||
|
||||
public static boolean checkIfReindexingAndIsUseAggEnabled(Map<String, Object> configuredPathSpec)
|
||||
{
|
||||
return TYPE.equals(configuredPathSpec.get("type")) && Boolean.parseBoolean(configuredPathSpec.getOrDefault(
|
||||
USE_NEW_AGGS_KEY,
|
||||
false
|
||||
).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.io.IOException;
|
|||
@JsonSubTypes.Type(name = "granular_unprocessed", value = GranularUnprocessedPathSpec.class),
|
||||
@JsonSubTypes.Type(name = "granularity", value = GranularityPathSpec.class),
|
||||
@JsonSubTypes.Type(name = "static", value = StaticPathSpec.class),
|
||||
@JsonSubTypes.Type(name = "dataSource", value = DatasourcePathSpec.class),
|
||||
@JsonSubTypes.Type(name = DatasourcePathSpec.TYPE, value = DatasourcePathSpec.class),
|
||||
@JsonSubTypes.Type(name = "multi", value = MultiplePathSpec.class)
|
||||
})
|
||||
public interface PathSpec
|
||||
|
|
|
@ -33,6 +33,7 @@ import io.druid.data.input.impl.CSVParseSpec;
|
|||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.hll.HyperLogLogCollector;
|
||||
import io.druid.indexer.hadoop.WindowedDataSegment;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.DateTimes;
|
||||
|
@ -47,11 +48,11 @@ import io.druid.segment.QueryableIndex;
|
|||
import io.druid.segment.QueryableIndexStorageAdapter;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.loading.LocalDataSegmentPuller;
|
||||
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
|
||||
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
|
||||
import io.druid.segment.transform.TransformSpec;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -149,7 +150,81 @@ public class BatchDeltaIngestionTest
|
|||
)
|
||||
);
|
||||
|
||||
testIngestion(config, expectedRows, Iterables.getOnlyElement(segments));
|
||||
testIngestion(
|
||||
config,
|
||||
expectedRows,
|
||||
Iterables.getOnlyElement(segments),
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts")
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* By default re-indexing expects same aggregators as used by original indexing job. But, with additional flag
|
||||
* "useNewAggs" in DatasourcePathSpec, user can optionally have any set of aggregators.
|
||||
* See https://github.com/druid-io/druid/issues/5277 .
|
||||
*/
|
||||
@Test
|
||||
public void testReindexingWithNewAggregators() throws Exception
|
||||
{
|
||||
List<WindowedDataSegment> segments = ImmutableList.of(new WindowedDataSegment(SEGMENT, INTERVAL_FULL));
|
||||
|
||||
AggregatorFactory[] aggregators = new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("visited_sum2", "visited_sum"),
|
||||
new HyperUniquesAggregatorFactory("unique_hosts2", "unique_hosts")
|
||||
};
|
||||
|
||||
Map<String, Object> inputSpec = ImmutableMap.<String, Object>of(
|
||||
"type",
|
||||
"dataSource",
|
||||
"ingestionSpec",
|
||||
ImmutableMap.of(
|
||||
"dataSource",
|
||||
"xyz",
|
||||
"interval",
|
||||
INTERVAL_FULL
|
||||
),
|
||||
"segments",
|
||||
segments,
|
||||
"useNewAggs", true
|
||||
);
|
||||
|
||||
File tmpDir = temporaryFolder.newFolder();
|
||||
|
||||
HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig(
|
||||
inputSpec,
|
||||
tmpDir,
|
||||
aggregators
|
||||
);
|
||||
|
||||
List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
|
||||
ImmutableMap.<String, Object>of(
|
||||
"time", DateTimes.of("2014-10-22T00:00:00.000Z"),
|
||||
"host", ImmutableList.of("a.example.com"),
|
||||
"visited_sum2", 100L,
|
||||
"unique_hosts2", 1.0d
|
||||
),
|
||||
ImmutableMap.<String, Object>of(
|
||||
"time", DateTimes.of("2014-10-22T01:00:00.000Z"),
|
||||
"host", ImmutableList.of("b.example.com"),
|
||||
"visited_sum2", 150L,
|
||||
"unique_hosts2", 1.0d
|
||||
),
|
||||
ImmutableMap.<String, Object>of(
|
||||
"time", DateTimes.of("2014-10-22T02:00:00.000Z"),
|
||||
"host", ImmutableList.of("c.example.com"),
|
||||
"visited_sum2", 200L,
|
||||
"unique_hosts2", 1.0d
|
||||
)
|
||||
);
|
||||
|
||||
testIngestion(
|
||||
config,
|
||||
expectedRows,
|
||||
Iterables.getOnlyElement(segments),
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum2", "unique_hosts2")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -189,7 +264,13 @@ public class BatchDeltaIngestionTest
|
|||
)
|
||||
);
|
||||
|
||||
testIngestion(config, expectedRows, Iterables.getOnlyElement(segments));
|
||||
testIngestion(
|
||||
config,
|
||||
expectedRows,
|
||||
Iterables.getOnlyElement(segments),
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -272,13 +353,21 @@ public class BatchDeltaIngestionTest
|
|||
)
|
||||
);
|
||||
|
||||
testIngestion(config, expectedRows, Iterables.getOnlyElement(segments));
|
||||
testIngestion(
|
||||
config,
|
||||
expectedRows,
|
||||
Iterables.getOnlyElement(segments),
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts")
|
||||
);
|
||||
}
|
||||
|
||||
private void testIngestion(
|
||||
HadoopDruidIndexerConfig config,
|
||||
List<ImmutableMap<String, Object>> expectedRowsGenerated,
|
||||
WindowedDataSegment windowedDataSegment
|
||||
WindowedDataSegment windowedDataSegment,
|
||||
List<String> expectedDimensions,
|
||||
List<String> expectedMetrics
|
||||
) throws Exception
|
||||
{
|
||||
IndexGeneratorJob job = new IndexGeneratorJob(config);
|
||||
|
@ -308,9 +397,8 @@ public class BatchDeltaIngestionTest
|
|||
Assert.assertEquals(INTERVAL_FULL, dataSegment.getInterval());
|
||||
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
|
||||
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
|
||||
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
|
||||
Assert.assertEquals("visited_sum", dataSegment.getMetrics().get(0));
|
||||
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
|
||||
Assert.assertEquals(expectedDimensions, dataSegment.getDimensions());
|
||||
Assert.assertEquals(expectedMetrics, dataSegment.getMetrics());
|
||||
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
|
||||
|
||||
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
|
||||
|
@ -326,8 +414,8 @@ public class BatchDeltaIngestionTest
|
|||
Firehose firehose = new IngestSegmentFirehose(
|
||||
ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())),
|
||||
TransformSpec.NONE,
|
||||
ImmutableList.of("host"),
|
||||
ImmutableList.of("visited_sum", "unique_hosts"),
|
||||
expectedDimensions,
|
||||
expectedMetrics,
|
||||
null
|
||||
);
|
||||
|
||||
|
@ -336,11 +424,21 @@ public class BatchDeltaIngestionTest
|
|||
rows.add(firehose.nextRow());
|
||||
}
|
||||
|
||||
verifyRows(expectedRowsGenerated, rows);
|
||||
verifyRows(expectedRowsGenerated, rows, expectedDimensions, expectedMetrics);
|
||||
}
|
||||
|
||||
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object> inputSpec, File tmpDir)
|
||||
throws Exception
|
||||
{
|
||||
return makeHadoopDruidIndexerConfig(inputSpec, tmpDir, null);
|
||||
}
|
||||
|
||||
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(
|
||||
Map<String, Object> inputSpec,
|
||||
File tmpDir,
|
||||
AggregatorFactory[] aggregators
|
||||
)
|
||||
throws Exception
|
||||
{
|
||||
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
|
||||
new HadoopIngestionSpec(
|
||||
|
@ -360,7 +458,7 @@ public class BatchDeltaIngestionTest
|
|||
),
|
||||
Map.class
|
||||
),
|
||||
new AggregatorFactory[]{
|
||||
aggregators != null ? aggregators : new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("visited_sum", "visited_num"),
|
||||
new HyperUniquesAggregatorFactory("unique_hosts", "host2")
|
||||
},
|
||||
|
@ -414,25 +512,37 @@ public class BatchDeltaIngestionTest
|
|||
return config;
|
||||
}
|
||||
|
||||
private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, List<InputRow> actualRows)
|
||||
private void verifyRows(
|
||||
List<ImmutableMap<String, Object>> expectedRows,
|
||||
List<InputRow> actualRows,
|
||||
List<String> expectedDimensions,
|
||||
List<String> expectedMetrics
|
||||
)
|
||||
{
|
||||
System.out.println("actualRows = " + actualRows);
|
||||
Assert.assertEquals(expectedRows.size(), actualRows.size());
|
||||
|
||||
for (int i = 0; i < expectedRows.size(); i++) {
|
||||
Map<String, Object> expected = expectedRows.get(i);
|
||||
InputRow actual = actualRows.get(i);
|
||||
|
||||
Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
|
||||
|
||||
Assert.assertEquals(expected.get("time"), actual.getTimestamp());
|
||||
Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
|
||||
Assert.assertEquals(expected.get("visited_sum"), actual.getMetric("visited_sum"));
|
||||
Assert.assertEquals(
|
||||
(Double) expected.get("unique_hosts"),
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false),
|
||||
0.001
|
||||
);
|
||||
|
||||
Assert.assertEquals(expectedDimensions, actual.getDimensions());
|
||||
|
||||
expectedDimensions.forEach(s -> Assert.assertEquals(expected.get(s), actual.getDimension(s)));
|
||||
|
||||
for (String metric : expectedMetrics) {
|
||||
Object actualValue = actual.getRaw(metric);
|
||||
if (actualValue instanceof HyperLogLogCollector) {
|
||||
Assert.assertEquals(
|
||||
(Double) expected.get(metric),
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actualValue, false),
|
||||
0.001
|
||||
);
|
||||
} else {
|
||||
Assert.assertEquals(expected.get(metric), actual.getMetric(metric));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
jsonMapper,
|
||||
null,
|
||||
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false, null),
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
|
||||
pathSpec,
|
||||
|
@ -125,7 +126,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
false,
|
||||
null
|
||||
),
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
|
||||
pathSpec,
|
||||
|
@ -154,7 +156,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
false,
|
||||
null
|
||||
),
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
|
||||
pathSpec,
|
||||
|
@ -180,7 +183,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
false,
|
||||
null
|
||||
),
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
|
||||
pathSpec,
|
||||
|
@ -212,7 +216,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
|
|||
false,
|
||||
null
|
||||
),
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -150,7 +150,8 @@ public class DatasourcePathSpecTest
|
|||
jsonMapper,
|
||||
null,
|
||||
ingestionSpec,
|
||||
Long.valueOf(10)
|
||||
Long.valueOf(10),
|
||||
false
|
||||
);
|
||||
PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
|
||||
Assert.assertEquals(expected, actual);
|
||||
|
@ -159,7 +160,8 @@ public class DatasourcePathSpecTest
|
|||
jsonMapper,
|
||||
null,
|
||||
ingestionSpec,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
|
||||
Assert.assertEquals(expected, actual);
|
||||
|
@ -168,7 +170,18 @@ public class DatasourcePathSpecTest
|
|||
jsonMapper,
|
||||
segments,
|
||||
ingestionSpec,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
|
||||
Assert.assertEquals(expected, actual);
|
||||
|
||||
expected = new DatasourcePathSpec(
|
||||
jsonMapper,
|
||||
segments,
|
||||
ingestionSpec,
|
||||
null,
|
||||
true
|
||||
);
|
||||
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
|
||||
Assert.assertEquals(expected, actual);
|
||||
|
@ -185,7 +198,8 @@ public class DatasourcePathSpecTest
|
|||
mapper,
|
||||
segments,
|
||||
ingestionSpec,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
Configuration config = new Configuration();
|
||||
|
@ -226,7 +240,8 @@ public class DatasourcePathSpecTest
|
|||
mapper,
|
||||
null,
|
||||
ingestionSpec,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
|
||||
Configuration config = new Configuration();
|
||||
|
@ -247,7 +262,8 @@ public class DatasourcePathSpecTest
|
|||
mapper,
|
||||
null,
|
||||
ingestionSpec.withIgnoreWhenNoSegments(true),
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
pathSpec.addInputPaths(hadoopIndexerConfig, job);
|
||||
|
||||
|
|
Loading…
Reference in New Issue