From b3437825f00f8d920281f3fe7373da10786c69c5 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 26 Jan 2016 17:00:12 -0600 Subject: [PATCH] add ignoreWhenNoSegments flag to optionally ignore the dataSource inputSpec when no segments were found --- .../content/ingestion/update-existing-data.md | 1 + .../hadoop/DatasourceIngestionSpec.java | 64 ++++++++- .../indexer/path/DatasourcePathSpec.java | 10 +- ...cUpdateDatasourcePathSpecSegmentsTest.java | 6 +- .../hadoop/DatasourceIngestionSpecTest.java | 54 +++++++- .../hadoop/DatasourceRecordReaderTest.java | 3 +- .../indexer/path/DatasourcePathSpecTest.java | 122 ++++++++++++------ 7 files changed, 207 insertions(+), 53 deletions(-) diff --git a/docs/content/ingestion/update-existing-data.md b/docs/content/ingestion/update-existing-data.md index 646b0f9d29c..761744f40c3 100644 --- a/docs/content/ingestion/update-existing-data.md +++ b/docs/content/ingestion/update-existing-data.md @@ -55,6 +55,7 @@ Here is what goes inside `ingestionSpec`: |filter|JSON|See [Filters](../querying/filters.html)|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| +|ignoreWhenNoSegments|boolean|Whether to ignore this ingestionSpec if no segments were found. Default behavior is to throw error when no segments were found.|no| For example diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java index 90ac234a864..497806a408e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -38,6 +38,7 @@ public class DatasourceIngestionSpec private final QueryGranularity granularity; private final List dimensions; private final List metrics; + private final boolean ignoreWhenNoSegments; @JsonCreator public DatasourceIngestionSpec( @@ -47,7 +48,8 @@ public class DatasourceIngestionSpec @JsonProperty("filter") DimFilter filter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("dimensions") List dimensions, - @JsonProperty("metrics") List metrics + @JsonProperty("metrics") List metrics, + @JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments ) { this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource"); @@ -70,6 +72,8 @@ public class DatasourceIngestionSpec this.dimensions = dimensions; this.metrics = metrics; + + this.ignoreWhenNoSegments = ignoreWhenNoSegments; } @JsonProperty @@ -108,19 +112,66 @@ public class DatasourceIngestionSpec return metrics; } + @JsonProperty + public boolean isIgnoreWhenNoSegments() + { + return ignoreWhenNoSegments; + } + public DatasourceIngestionSpec withDimensions(List dimensions) { - return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics); + return new DatasourceIngestionSpec( + dataSource, + null, + intervals, + filter, + granularity, + dimensions, + metrics, + ignoreWhenNoSegments + ); } public DatasourceIngestionSpec withMetrics(List metrics) { - return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics); + return new DatasourceIngestionSpec( + dataSource, + null, + intervals, + filter, + granularity, + dimensions, + metrics, + ignoreWhenNoSegments + ); } public DatasourceIngestionSpec withQueryGranularity(QueryGranularity granularity) { - return new DatasourceIngestionSpec(dataSource, null, intervals, filter, granularity, dimensions, metrics); + return new DatasourceIngestionSpec( + dataSource, + null, + intervals, + filter, + granularity, + dimensions, + metrics, + ignoreWhenNoSegments + ); + } + + public DatasourceIngestionSpec withIgnoreWhenNoSegments(boolean ignoreWhenNoSegments) + { + return new DatasourceIngestionSpec( + dataSource, + null, + intervals, + filter, + granularity, + dimensions, + metrics, + ignoreWhenNoSegments + ); } @Override @@ -135,6 +186,9 @@ public class DatasourceIngestionSpec DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; + if (ignoreWhenNoSegments != that.ignoreWhenNoSegments) { + return false; + } if (!dataSource.equals(that.dataSource)) { return false; } @@ -163,6 +217,7 @@ public class DatasourceIngestionSpec result = 31 * result + granularity.hashCode(); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + result = 31 * result + (ignoreWhenNoSegments ? 1 : 0); return result; } @@ -176,6 +231,7 @@ public class DatasourceIngestionSpec ", granularity=" + granularity + ", dimensions=" + dimensions + ", metrics=" + metrics + + ", ignoreWhenNoSegments=" + ignoreWhenNoSegments + '}'; } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index b4d0382910c..27cd4054b0c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.hadoop.DatasourceIngestionSpec; @@ -93,7 +94,14 @@ public class DatasourcePathSpec implements PathSpec HadoopDruidIndexerConfig config, Job job ) throws IOException { - Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided"); + if (segments == null || segments.isEmpty()) { + if (ingestionSpec.isIgnoreWhenNoSegments()) { + logger.warn("No segments found for ingestionSpec [%s]", ingestionSpec); + return job; + } else { + throw new ISE("No segments found for ingestion spec [%s]", ingestionSpec); + } + } logger.info( "Found total [%d] segments for [%s] in interval [%s]", diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index 3fe6cb0719d..acedfe7c105 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -91,7 +91,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest PathSpec pathSpec = new DatasourcePathSpec( jsonMapper, null, - new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null), + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), null ); HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( @@ -111,7 +111,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest PathSpec pathSpec = new DatasourcePathSpec( jsonMapper, null, - new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null), + new DatasourceIngestionSpec(testDatasource, testDatasourceIntervalPartial, null, null, null, null, null, false), null ); HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( @@ -133,7 +133,7 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest new DatasourcePathSpec( jsonMapper, null, - new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null), + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), null ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java index a9c3b3e8d8c..8b2b837bc12 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -49,7 +49,8 @@ public class DatasourceIngestionSpecTest new SelectorDimFilter("dim", "value"), QueryGranularity.DAY, Lists.newArrayList("d1", "d2"), - Lists.newArrayList("m1", "m2", "m3") + Lists.newArrayList("m1", "m2", "m3"), + false ); DatasourceIngestionSpec actual = MAPPER.readValue(MAPPER.writeValueAsString(expected), DatasourceIngestionSpec.class); @@ -60,22 +61,63 @@ public class DatasourceIngestionSpecTest @Test public void testMultiIntervalSerde() throws Exception { + //defaults + String jsonStr = "{\n" + + " \"dataSource\": \"test\",\n" + + " \"intervals\": [\"2014/2015\", \"2016/2017\"]\n" + + "}\n"; + + DatasourceIngestionSpec actual = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class) + ), + DatasourceIngestionSpec.class + ); + List intervals = ImmutableList.of(Interval.parse("2014/2015"), Interval.parse("2016/2017")); + DatasourceIngestionSpec expected = new DatasourceIngestionSpec( + "test", + null, + intervals, + null, + null, + null, + null, + false + ); + + Assert.assertEquals(expected, actual); + + //non-defaults + jsonStr = "{\n" + + " \"dataSource\": \"test\",\n" + + " \"intervals\": [\"2014/2015\", \"2016/2017\"],\n" + + " \"filter\": { \"type\": \"selector\", \"dimension\": \"dim\", \"value\": \"value\"},\n" + + " \"granularity\": \"day\",\n" + + " \"dimensions\": [\"d1\", \"d2\"],\n" + + " \"metrics\": [\"m1\", \"m2\", \"m3\"],\n" + + " \"ignoreWhenNoSegments\": true\n" + + "}\n"; + + expected = new DatasourceIngestionSpec( "test", null, intervals, new SelectorDimFilter("dim", "value"), QueryGranularity.DAY, Lists.newArrayList("d1", "d2"), - Lists.newArrayList("m1", "m2", "m3") + Lists.newArrayList("m1", "m2", "m3"), + true ); - DatasourceIngestionSpec actual = MAPPER.readValue( - MAPPER.writeValueAsString(expected), + actual = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class) + ), DatasourceIngestionSpec.class ); - Assert.assertEquals(intervals, actual.getIntervals()); + Assert.assertEquals(expected, actual); } @@ -86,7 +128,7 @@ public class DatasourceIngestionSpecTest DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); Assert.assertEquals( - new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null), + new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false), actual ); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java index 0d99dbb947d..b910e9b49b0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -68,7 +68,8 @@ public class DatasourceRecordReaderTest null, null, segment.getDimensions(), - segment.getMetrics() + segment.getMetrics(), + false ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index 9760d2f162a..6385c79544c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -28,6 +28,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; import com.metamx.common.Granularity; +import com.metamx.common.ISE; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.StringInputRowParser; @@ -78,7 +79,8 @@ public class DatasourcePathSpecTest null, null, null, - null + null, + false ); segments = ImmutableList.of( @@ -174,43 +176,7 @@ public class DatasourcePathSpecTest @Test public void testAddInputPaths() throws Exception { - HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig( - new HadoopIngestionSpec( - new DataSchema( - ingestionSpec.getDataSource(), - HadoopDruidIndexerConfig.JSON_MAPPER.convertValue( - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(null, null, null), - null, - ImmutableList.of("timestamp", "host", "visited") - ) - ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("visited_sum", "visited") - }, - new UniformGranularitySpec( - Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000")) - ), - HadoopDruidIndexerConfig.JSON_MAPPER - ), - new HadoopIOConfig( - ImmutableMap.of( - "paths", - "/tmp/dummy", - "type", - "static" - ), - null, - "/tmp/dummy" - ), - HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") - ) - ); - + HadoopDruidIndexerConfig hadoopIndexerConfig = makeHadoopDruidIndexerConfig(); ObjectMapper mapper = new DefaultObjectMapper(); @@ -247,4 +213,84 @@ public class DatasourcePathSpecTest actualIngestionSpec ); } + + @Test + public void testAddInputPathsWithNoSegments() throws Exception + { + HadoopDruidIndexerConfig hadoopIndexerConfig = makeHadoopDruidIndexerConfig(); + + ObjectMapper mapper = new DefaultObjectMapper(); + + DatasourcePathSpec pathSpec = new DatasourcePathSpec( + mapper, + null, + ingestionSpec, + null + ); + + Configuration config = new Configuration(); + Job job = EasyMock.createNiceMock(Job.class); + EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(job); + + try { + pathSpec.addInputPaths(hadoopIndexerConfig, job); + Assert.fail("should've been ISE"); + } + catch (ISE ex) { + //OK + } + + //now with ignoreWhenNoSegments flag set + pathSpec = new DatasourcePathSpec( + mapper, + null, + ingestionSpec.withIgnoreWhenNoSegments(true), + null + ); + pathSpec.addInputPaths(hadoopIndexerConfig, job); + + Assert.assertNull(config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS)); + Assert.assertNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA)); + } + + private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig() + { + return new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + ingestionSpec.getDataSource(), + HadoopDruidIndexerConfig.JSON_MAPPER.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(null, null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + Map.class + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000")) + ), + HadoopDruidIndexerConfig.JSON_MAPPER + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + "/tmp/dummy", + "type", + "static" + ), + null, + "/tmp/dummy" + ), + HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") + ) + ); + } }