Merge pull request #2340 from himanshug/data_src_input_no_segments

add ignoreWhenNoSegments flag to optionally ignore no segments error
This commit is contained in:
Bingkun Guo 2016-01-26 18:45:13 -06:00
commit cac675351e
7 changed files with 207 additions and 53 deletions

View File

@ -55,6 +55,7 @@ Here is what goes inside `ingestionSpec`:
|filter|JSON|See [Filters](../querying/filters.html)|no| |filter|JSON|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| |metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
|ignoreWhenNoSegments|boolean|Whether to ignore this ingestionSpec if no segments were found. Default behavior is to throw error when no segments were found.|no|
For example For example

View File

@ -38,6 +38,7 @@ public class DatasourceIngestionSpec
private final QueryGranularity granularity; private final QueryGranularity granularity;
private final List<String> dimensions; private final List<String> dimensions;
private final List<String> metrics; private final List<String> metrics;
private final boolean ignoreWhenNoSegments;
@JsonCreator @JsonCreator
public DatasourceIngestionSpec( public DatasourceIngestionSpec(
@ -47,7 +48,8 @@ public class DatasourceIngestionSpec
@JsonProperty("filter") DimFilter filter, @JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions, @JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics @JsonProperty("metrics") List<String> metrics,
@JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments
) )
{ {
this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource"); this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource");
@ -70,6 +72,8 @@ public class DatasourceIngestionSpec
this.dimensions = dimensions; this.dimensions = dimensions;
this.metrics = metrics; this.metrics = metrics;
this.ignoreWhenNoSegments = ignoreWhenNoSegments;
} }
@JsonProperty @JsonProperty
@ -108,19 +112,66 @@ public class DatasourceIngestionSpec
return metrics; return metrics;
} }
@JsonProperty
public boolean isIgnoreWhenNoSegments()
{
return ignoreWhenNoSegments;
}
public DatasourceIngestionSpec withDimensions(List<String> dimensions) public DatasourceIngestionSpec withDimensions(List<String> dimensions)
{ {
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics); return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
} }
public DatasourceIngestionSpec withMetrics(List<String> metrics) public DatasourceIngestionSpec withMetrics(List<String> metrics)
{ {
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics); return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
} }
public DatasourceIngestionSpec withQueryGranularity(QueryGranularity granularity) public DatasourceIngestionSpec withQueryGranularity(QueryGranularity granularity)
{ {
return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics); return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
}
public DatasourceIngestionSpec withIgnoreWhenNoSegments(boolean ignoreWhenNoSegments)
{
return new DatasourceIngestionSpec(
dataSource,
null,
intervals,
filter,
granularity,
dimensions,
metrics,
ignoreWhenNoSegments
);
} }
@Override @Override
@ -135,6 +186,9 @@ public class DatasourceIngestionSpec
DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; DatasourceIngestionSpec that = (DatasourceIngestionSpec) o;
if (ignoreWhenNoSegments != that.ignoreWhenNoSegments) {
return false;
}
if (!dataSource.equals(that.dataSource)) { if (!dataSource.equals(that.dataSource)) {
return false; return false;
} }
@ -163,6 +217,7 @@ public class DatasourceIngestionSpec
result = 31 * result + granularity.hashCode(); result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0); result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (ignoreWhenNoSegments ? 1 : 0);
return result; return result;
} }
@ -176,6 +231,7 @@ public class DatasourceIngestionSpec
", granularity=" + granularity + ", granularity=" + granularity +
", dimensions=" + dimensions + ", dimensions=" + dimensions +
", metrics=" + metrics + ", metrics=" + metrics +
", ignoreWhenNoSegments=" + ignoreWhenNoSegments +
'}'; '}';
} }
} }

View File

@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexer.hadoop.DatasourceIngestionSpec;
@ -93,7 +94,14 @@ public class DatasourcePathSpec implements PathSpec
HadoopDruidIndexerConfig config, Job job HadoopDruidIndexerConfig config, Job job
) throws IOException ) throws IOException
{ {
Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided"); if (segments == null || segments.isEmpty()) {
if (ingestionSpec.isIgnoreWhenNoSegments()) {
logger.warn("No segments found for ingestionSpec [%s]", ingestionSpec);
return job;
} else {
throw new ISE("No segments found for ingestion spec [%s]", ingestionSpec);
}
}
logger.info( logger.info(
"Found total [%d] segments for [%s] in interval [%s]", "Found total [%d] segments for [%s] in interval [%s]",

View File

@ -91,7 +91,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec( PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null), new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
null null
); );
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -111,7 +111,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
PathSpec pathSpec = new DatasourcePathSpec( PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null), new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null, false),
null null
); );
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
@ -133,7 +133,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
new DatasourcePathSpec( new DatasourcePathSpec(
jsonMapper, jsonMapper,
null, null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null), new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false),
null null
) )
) )

View File

@ -49,7 +49,8 @@ public class DatasourceIngestionSpecTest
new SelectorDimFilter("dim", "value"), new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY, QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"), Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3") Lists.newArrayList("m1", "m2", "m3"),
false
); );
DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class); DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class);
@ -60,22 +61,63 @@ public class DatasourceIngestionSpecTest
@Test @Test
public void testMultiIntervalSerde() throws Exception public void testMultiIntervalSerde() throws Exception
{ {
//defaults
String jsonStr = "{\n"
+ " \"dataSource\": \"test\",\n"
+ " \"intervals\": [\"2014/2015\", \"2016/2017\"]\n"
+ "}\n";
DatasourceIngestionSpec actual = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class)
),
DatasourceIngestionSpec.class
);
List<Interval> intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017")); List<Interval> intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017"));
DatasourceIngestionSpec expected = new DatasourceIngestionSpec( DatasourceIngestionSpec expected = new DatasourceIngestionSpec(
"test",
null,
intervals,
null,
null,
null,
null,
false
);
Assert.assertEquals(expected, actual);
//non-defaults
jsonStr = "{\n"
+ " \"dataSource\": \"test\",\n"
+ " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n"
+ " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n"
+ " \"granularity\": \"day\",\n"
+ " \"dimensions\": [\"d1\", \"d2\"],\n"
+ " \"metrics\": [\"m1\", \"m2\", \"m3\"],\n"
+ " \"ignoreWhenNoSegments\": true\n"
+ "}\n";
expected = new DatasourceIngestionSpec(
"test", "test",
null, null,
intervals, intervals,
new SelectorDimFilter("dim", "value"), new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY, QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"), Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3") Lists.newArrayList("m1", "m2", "m3"),
true
); );
DatasourceIngestionSpec actual = MAPPER.readValue( actual = MAPPER.readValue(
MAPPER.writeValueAsString(expected), MAPPER.writeValueAsString(
MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class)
),
DatasourceIngestionSpec.class DatasourceIngestionSpec.class
); );
Assert.assertEquals(intervals, actual.getIntervals());
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@ -86,7 +128,7 @@ public class DatasourceIngestionSpecTest
DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class);
Assert.assertEquals( Assert.assertEquals(
new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null), new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false),
actual actual
); );
} }

View File

@ -68,7 +68,8 @@ public class DatasourceRecordReaderTest
null, null,
null, null,
segment.getDimensions(), segment.getDimensions(),
segment.getMetrics() segment.getMetrics(),
false
) )
) )
); );

View File

@ -28,6 +28,7 @@ import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
@ -78,7 +79,8 @@ public class DatasourcePathSpecTest
null, null,
null, null,
null, null,
null null,
false
); );
segments = ImmutableList.of( segments = ImmutableList.of(
@ -174,43 +176,7 @@ public class DatasourcePathSpecTest
@Test @Test
public void testAddInputPaths() throws Exception public void testAddInputPaths() throws Exception
{ {
HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig( HadoopDruidIndexerConfig hadoopIndexerConfig = makeHadoopDruidIndexerConfig();
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
),
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
ObjectMapper mapper = new DefaultObjectMapper(); ObjectMapper mapper = new DefaultObjectMapper();
@ -247,4 +213,84 @@ public class DatasourcePathSpecTest
actualIngestionSpec actualIngestionSpec
); );
} }
@Test
public void testAddInputPathsWithNoSegments() throws Exception
{
HadoopDruidIndexerConfig hadoopIndexerConfig = makeHadoopDruidIndexerConfig();
ObjectMapper mapper = new DefaultObjectMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec(
mapper,
null,
ingestionSpec,
null
);
Configuration config = new Configuration();
Job job = EasyMock.createNiceMock(Job.class);
EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(job);
try {
pathSpec.addInputPaths(hadoopIndexerConfig, job);
Assert.fail("should've been ISE");
}
catch (ISE ex) {
//OK
}
//now with ignoreWhenNoSegments flag set
pathSpec = new DatasourcePathSpec(
mapper,
null,
ingestionSpec.withIgnoreWhenNoSegments(true),
null
);
pathSpec.addInputPaths(hadoopIndexerConfig, job);
Assert.assertNull(config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS));
Assert.assertNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA));
}
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig()
{
return new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
),
HadoopDruidIndexerConfig.JSON_MAPPER
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
}
} }