add ignoreWhenNoSegments flag to optionally ignore the dataSource inputSpec when no segments were found

This commit is contained in:
Himanshu Gupta 2016-01-26 17:00:12 -06:00
parent 3844658fb5
commit b3437825f0
7 changed files with 207 additions and 53 deletions

View File

@ -55,6 +55,7 @@ Here is what goes inside `ingestionSpec`:
|filter|JSON|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
|ignoreWhenNoSegments|boolean|Whether to ignore this ingestionSpec if no segments were found. Default behavior is to throw error when no segments were found.|no|
For example

View File

@ -38,6 +38,7 @@ public class DatasourceIngestionSpec
private final QueryGranularity granularity;
private final List<String> dimensions;
private final List<String> metrics;
private final boolean ignoreWhenNoSegments;
@JsonCreator
public DatasourceIngestionSpec(
@ -47,7 +48,8 @@ public class DatasourceIngestionSpec
@JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics
@JsonProperty("metrics") List<String> metrics,
@JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource");
@ -70,6 +72,8 @@ public class DatasourceIngestionSpec
this.dimensions = dimensions;
this.metrics = metrics;
this.ignoreWhenNoSegments = ignoreWhenNoSegments;
}
@JsonProperty
@ -108,19 +112,66 @@ public class DatasourceIngestionSpec
return metrics;
}
@JsonProperty
public boolean isIgnoreWhenNoSegments()
{
return ignoreWhenNoSegments;
}
public DatasourceIngestionSpec withDimensions(List<String> dimensions)
{
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics);
return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
}
public DatasourceIngestionSpec withMetrics(List<String> metrics)
{
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics);
return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
}
public DatasourceIngestionSpec withQueryGranularity(QueryGranularity granularity)
{
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics);
return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
}
public DatasourceIngestionSpec withIgnoreWhenNoSegments(boolean ignoreWhenNoSegments)
{
return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
}
@Override
@ -135,6 +186,9 @@ public class DatasourceIngestionSpec
DatasourceIngestionSpec that = (DatasourceIngestionSpec) o;
if (ignoreWhenNoSegments != that.ignoreWhenNoSegments) {
return false;
}
if (!dataSource.equals(that.dataSource)) {
return false;
}
@ -163,6 +217,7 @@ public class DatasourceIngestionSpec
result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (ignoreWhenNoSegments ? 1 : 0);
return result;
}
@ -176,6 +231,7 @@ public class DatasourceIngestionSpec
", granularity=" + granularity +
", dimensions=" + dimensions +
", metrics=" + metrics +
", ignoreWhenNoSegments=" + ignoreWhenNoSegments +
'}';
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
@ -93,7 +94,14 @@ public class DatasourcePathSpec implements PathSpec
HadoopDruidIndexerConfig config, Job job
) throws IOException
{
Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided");
if (segments == null || segments.isEmpty()) {
if (ingestionSpec.isIgnoreWhenNoSegments()) {
logger.warn("No segments found for ingestionSpec [%s]", ingestionSpec);
return job;
} else {
throw new ISE("No segments found for ingestion spec [%s]", ingestionSpec);
}
}
logger.info(
"Found total [%d] segments for [%s] in interval [%s]",

View File

@ -91,7 +91,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null),
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -111,7 +111,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null),
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null, false),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -133,7 +133,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null),
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
null
)
)

View File

@ -49,7 +49,8 @@ public class DatasourceIngestionSpecTest
new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3")
Lists.newArrayList("m1", "m2", "m3"),
false
);
DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class);
@ -60,22 +61,63 @@ public class DatasourceIngestionSpecTest
@Test
public void testMultiIntervalSerde() throws Exception
{
//defaults
String jsonStr = "{\n"
+ " \"dataSource\": \"test\",\n"
+ " \"intervals\": [\"2014/2015\", \"2016/2017\"]\n"
+ "}\n";
DatasourceIngestionSpec actual = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class)
),
DatasourceIngestionSpec.class
);
List<Interval> intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017"));
DatasourceIngestionSpec expected = new DatasourceIngestionSpec(
"test",
null,
intervals,
null,
null,
null,
null,
false
);
Assert.assertEquals(expected, actual);
//non-defaults
jsonStr = "{\n"
+ " \"dataSource\": \"test\",\n"
+ " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n"
+ " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n"
+ " \"granularity\": \"day\",\n"
+ " \"dimensions\": [\"d1\", \"d2\"],\n"
+ " \"metrics\": [\"m1\", \"m2\", \"m3\"],\n"
+ " \"ignoreWhenNoSegments\": true\n"
+ "}\n";
expected = new DatasourceIngestionSpec(
"test",
null,
intervals,
new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3")
Lists.newArrayList("m1", "m2", "m3"),
true
);
DatasourceIngestionSpec actual = MAPPER.readValue(
MAPPER.writeValueAsString(expected),
actual = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class)
),
DatasourceIngestionSpec.class
);
Assert.assertEquals(intervals, actual.getIntervals());
Assert.assertEquals(expected, actual);
}
@ -86,7 +128,7 @@ public class DatasourceIngestionSpecTest
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
Assert.assertEquals(
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null),
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false),
actual
);
}

View File

@ -68,7 +68,8 @@ public class DatasourceRecordReaderTest
null,
null,
segment.getDimensions(),
segment.getMetrics()
segment.getMetrics(),
false
)
)
);

View File

@ -28,6 +28,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
@ -78,7 +79,8 @@ public class DatasourcePathSpecTest
null,
null,
null,
null
null,
false
);
segments = ImmutableList.of(
@ -174,43 +176,7 @@ public class DatasourcePathSpecTest
@Test
public void testAddInputPaths() throws Exception
{
HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
),
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
HadoopDruidIndexerConfig hadoopIndexerConfig = makeHadoopDruidIndexerConfig();
ObjectMapper mapper = new DefaultObjectMapper();
@ -247,4 +213,84 @@ public class DatasourcePathSpecTest
actualIngestionSpec
);
}
@Test
public void testAddInputPathsWithNoSegments() throws Exception
{
HadoopDruidIndexerConfig hadoopIndexerConfig = makeHadoopDruidIndexerConfig();
ObjectMapper mapper = new DefaultObjectMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec(
mapper,
null,
ingestionSpec,
null
);
Configuration config = new Configuration();
Job job = EasyMock.createNiceMock(Job.class);
EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(job);
try {
pathSpec.addInputPaths(hadoopIndexerConfig, job);
Assert.fail("should've been ISE");
}
catch (ISE ex) {
//OK
}
//now with ignoreWhenNoSegments flag set
pathSpec = new DatasourcePathSpec(
mapper,
null,
ingestionSpec.withIgnoreWhenNoSegments(true),
null
);
pathSpec.addInputPaths(hadoopIndexerConfig, job);
Assert.assertNull(config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS));
Assert.assertNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA));
}
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig()
{
return new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
),
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
}
}