From 74f4572bd49a04b4ca381fcd7b8f360c8ef52480 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 31 Aug 2015 22:24:10 -0500 Subject: [PATCH 1/2] Lazily deserialize "parser" to InputRowParser in DataSchema so that user hadoop related InputRowParsers are created only when needed this allows overlord to accept a HadoopIndexTask with a hadoopy InputRowParser and not fail because hadoopy InputRowParser might need hadoop libraries --- .../indexer/BatchDeltaIngestionTest.java | 20 ++- .../DetermineHashedPartitionsJobTest.java | 38 +++-- .../indexer/DeterminePartitionsJobTest.java | 20 ++- .../indexer/HadoopDruidIndexerConfigTest.java | 24 ++- .../indexer/HadoopIngestionSpecTest.java | 9 +- ...cUpdateDatasourcePathSpecSegmentsTest.java | 16 +- .../indexer/IndexGeneratorCombinerTest.java | 21 ++- .../druid/indexer/IndexGeneratorJobTest.java | 20 ++- .../java/io/druid/indexer/JobHelperTest.java | 20 ++- .../indexer/path/DatasourcePathSpecTest.java | 21 ++- .../updater/HadoopConverterJobTest.java | 22 +-- .../indexing/common/TestRealtimeTask.java | 8 +- .../indexing/common/task/IndexTaskTest.java | 111 ++++++++------ .../indexing/common/task/TaskSerdeTest.java | 18 ++- .../overlord/RemoteTaskRunnerTest.java | 12 +- .../indexing/overlord/TaskLifecycleTest.java | 14 +- .../indexing/worker/TaskAnnouncementTest.java | 2 +- .../io/druid/segment/indexing/DataSchema.java | 144 +++++++++++++----- .../granularity/ArbitraryGranularitySpec.java | 29 ++++ .../segment/indexing/DataSchemaTest.java | 129 ++++++++++++++-- .../segment/realtime/FireDepartmentTest.java | 33 ++-- .../segment/realtime/RealtimeManagerTest.java | 10 +- .../plumber/RealtimePlumberSchoolTest.java | 41 +++-- .../segment/realtime/plumber/SinkTest.java | 4 +- 24 files changed, 546 insertions(+), 240 deletions(-) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 2abea2641c9..d7bf8d421b5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -340,13 +340,16 @@ public class BatchDeltaIngestionTest new HadoopIngestionSpec( new DataSchema( "website", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(ImmutableList.of("host"), null, null), - null, - ImmutableList.of("timestamp", "host", "host2", "visited_num") - ) + MAPPER.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "host2", "visited_num") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("visited_sum", "visited_num"), @@ -354,7 +357,8 @@ public class BatchDeltaIngestionTest }, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(INTERVAL_FULL) - ) + ), + MAPPER ), new HadoopIOConfig( inputSpec, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 4fd7a371675..c10a575f7ca 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -108,26 +108,36 @@ public class DetermineHashedPartitionsJobTest HadoopIngestionSpec ingestionSpec = new HadoopIngestionSpec( new DataSchema( "test_schema", - new StringInputRowParser( - new DelimitedParseSpec( - new TimestampSpec("ts", null, null), - new DimensionsSpec(ImmutableList.of("market", "quality", "placement", "placementish"), null, null), - "\t", - null, - Arrays.asList("ts", - "market", - "quality", - "placement", - "placementish", - "index") - ) + HadoopDruidIndexerConfig.jsonMapper.convertValue( + new StringInputRowParser( + new DelimitedParseSpec( + new TimestampSpec("ts", null, null), + new DimensionsSpec( + ImmutableList.of("market", "quality", "placement", "placementish"), + null, + null + ), + "\t", + null, + Arrays.asList( + "ts", + "market", + "quality", + "placement", + "placementish", + "index" + ) + ) + ), + Map.class ), new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index")}, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(new Interval(interval)) - ) + ), + HadoopDruidIndexerConfig.jsonMapper ), new HadoopIOConfig( ImmutableMap.of( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 574acaa44b5..e020bd72034 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -223,18 +223,22 @@ public class DeterminePartitionsJobTest new HadoopIngestionSpec( new DataSchema( "website", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(ImmutableList.of("host", "country"), null, null), - null, - ImmutableList.of("timestamp", "host", "country", "visited_num") - ) + HadoopDruidIndexerConfig.jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host", "country"), null, null), + null, + ImmutableList.of("timestamp", "host", "country", "visited_num") + ) + ), + Map.class ), new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")}, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(new Interval(interval)) - ) + ), + HadoopDruidIndexerConfig.jsonMapper ), new HadoopIOConfig( ImmutableMap.of( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 00b7b36577b..5447b90c7f8 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -17,6 +17,7 @@ package io.druid.indexer; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -45,7 +46,11 @@ import java.util.List; */ public class HadoopDruidIndexerConfigTest { - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final ObjectMapper jsonMapper; + static { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); + } public static T jsonReadWriteRead(String s, Class klass) { @@ -175,12 +180,17 @@ public class HadoopDruidIndexerConfigTest HadoopIngestionSpec spec = new HadoopIngestionSpec( new DataSchema( - "foo", null, new AggregatorFactory[0], new UniformGranularitySpec( - Granularity.MINUTE, - QueryGranularity.MINUTE, - ImmutableList.of(new Interval("2010-01-01/P1D")) - ) - ), new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null), + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec( + Granularity.MINUTE, + QueryGranularity.MINUTE, + ImmutableList.of(new Interval("2010-01-01/P1D")) + ), + jsonMapper + ), + new HadoopIOConfig(ImmutableMap.of("paths", "bar", "type", "static"), null, null), new HadoopTuningConfig( null, null, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java index d34d204a9b5..9fed2d9ac45 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java @@ -17,15 +17,16 @@ package io.druid.indexer; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.druid.indexer.partitions.HashedPartitionsSpec; -import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.SingleDimensionPartitionsSpec; import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import org.joda.time.Interval; import org.junit.Assert; @@ -33,7 +34,11 @@ import org.junit.Test; public class HadoopIngestionSpecTest { - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final ObjectMapper jsonMapper; + static { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); + } @Test public void testGranularitySpec() diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index b82b00d666e..0060c1fb98f 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -19,6 +19,9 @@ package io.druid.indexer; +import com.fasterxml.jackson.databind.BeanProperty; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -50,7 +53,15 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest private final String testDatasource = "test"; private final Interval testDatasourceInterval = new Interval("1970/3000"); private final Interval testDatasourceIntervalPartial = new Interval("2050/3000"); - private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private final ObjectMapper jsonMapper; + + public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues( + new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper) + ); + } private static final DataSegment SEGMENT = new DataSegment( "test1", @@ -155,7 +166,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest ImmutableList.of( new Interval("2010-01-01/P1D") ) - ) + ), + jsonMapper ), new HadoopIOConfig( jsonMapper.convertValue(datasourcePathSpec, Map.class), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java index ced64faa398..4397a71605d 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -47,6 +47,7 @@ import org.junit.Before; import org.junit.Test; import java.util.List; +import java.util.Map; /** */ @@ -62,13 +63,16 @@ public class IndexGeneratorCombinerTest new HadoopIngestionSpec( new DataSchema( "website", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(ImmutableList.of("host"), null, null), - null, - ImmutableList.of("timestamp", "host", "visited") - ) + HadoopDruidIndexerConfig.jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("visited_sum", "visited"), @@ -76,7 +80,8 @@ public class IndexGeneratorCombinerTest }, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011")) - ) + ), + HadoopDruidIndexerConfig.jsonMapper ), new HadoopIOConfig( ImmutableMap.of( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index b8ebbb05070..74ba51ccd04 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -331,13 +331,16 @@ public class IndexGeneratorJobTest new HadoopIngestionSpec( new DataSchema( "website", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(ImmutableList.of("host"), null, null), - null, - ImmutableList.of("timestamp", "host", "visited_num") - ) + mapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("visited_num", "visited_num"), @@ -345,7 +348,8 @@ public class IndexGeneratorJobTest }, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) - ) + ), + mapper ), new HadoopIOConfig( ImmutableMap.copyOf(inputSpec), diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 3334a70ca89..d893900687d 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -66,18 +66,22 @@ public class JobHelperTest new HadoopIngestionSpec( new DataSchema( "website", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(ImmutableList.of("host"), null, null), - null, - ImmutableList.of("timestamp", "host", "visited_num") - ) + HadoopDruidIndexerConfig.jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ), + Map.class ), new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")}, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) - ) + ), + HadoopDruidIndexerConfig.jsonMapper ), new HadoopIOConfig( ImmutableMap.of( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index 54a92198b64..de2c0d22e53 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -60,6 +60,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.List; +import java.util.Map; /** */ @@ -176,20 +177,24 @@ public class DatasourcePathSpecTest new HadoopIngestionSpec( new DataSchema( ingestionSpec.getDataSource(), - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(null, null, null), - null, - ImmutableList.of("timestamp", "host", "visited") - ) + HadoopDruidIndexerConfig.jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(null, null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("visited_sum", "visited") }, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000")) - ) + ), + HadoopDruidIndexerConfig.jsonMapper ), new HadoopIOConfig( ImmutableMap.of( diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index be2c6c3301d..4170edf922a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -156,14 +156,17 @@ public class HadoopConverterJobTest new HadoopIngestionSpec( new DataSchema( DATASOURCE, - new StringInputRowParser( - new DelimitedParseSpec( - new TimestampSpec("ts", "iso", null), - new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null), - "\t", - "\u0001", - Arrays.asList(TestIndex.COLUMNS) - ) + HadoopDruidIndexerConfig.jsonMapper.convertValue( + new StringInputRowParser( + new DelimitedParseSpec( + new TimestampSpec("ts", "iso", null), + new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null), + "\t", + "\u0001", + Arrays.asList(TestIndex.COLUMNS) + ) + ), + Map.class ), new AggregatorFactory[]{ new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0]), @@ -173,7 +176,8 @@ public class HadoopConverterJobTest Granularity.MONTH, QueryGranularity.DAY, ImmutableList.of(interval) - ) + ), + HadoopDruidIndexerConfig.jsonMapper ), new HadoopIOConfig( ImmutableMap.of( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 9e22e37235b..6fa2d8d3fe7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -17,11 +17,14 @@ package io.druid.indexing.common; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.TaskResource; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; @@ -46,14 +49,15 @@ public class TestRealtimeTask extends RealtimeIndexTask @JsonProperty("id") String id, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSource") String dataSource, - @JsonProperty("taskStatus") TaskStatus status + @JsonProperty("taskStatus") TaskStatus status, + @JacksonInject ObjectMapper mapper ) { super( id, taskResource, new FireDepartment( - new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), new RealtimeIOConfig( + new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, mapper), new RealtimeIOConfig( new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index f923bcb1853..db30ddc4033 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -17,6 +17,7 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import io.druid.data.input.impl.CSVParseSpec; @@ -56,12 +57,14 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Arrays; import java.util.List; +import java.util.Map; public class IndexTaskTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); private final IndexSpec indexSpec = new IndexSpec(); + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); @Test public void testDeterminePartitions() throws Exception @@ -82,21 +85,24 @@ public class IndexTaskTest new IndexTask.IndexIngestionSpec( new DataSchema( "test", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - Arrays.asList("ts"), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ) + jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + Arrays.asList("ts"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("val", "val") @@ -105,7 +111,8 @@ public class IndexTaskTest Granularity.DAY, QueryGranularity.MINUTE, Arrays.asList(new Interval("2014/2015")) - ) + ), + jsonMapper ), new IndexTask.IndexIOConfig( new LocalFirehoseFactory( @@ -149,21 +156,24 @@ public class IndexTaskTest new IndexTask.IndexIngestionSpec( new DataSchema( "test", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - Arrays.asList("ts"), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ) + jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + Arrays.asList("ts"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("val", "val") @@ -171,7 +181,8 @@ public class IndexTaskTest new ArbitraryGranularitySpec( QueryGranularity.MINUTE, Arrays.asList(new Interval("2014/2015")) - ) + ), + jsonMapper ), new IndexTask.IndexIOConfig( new LocalFirehoseFactory( @@ -257,21 +268,24 @@ public class IndexTaskTest new IndexTask.IndexIngestionSpec( new DataSchema( "test", - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - Arrays.asList("dim"), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ) + jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null + ), + new DimensionsSpec( + Arrays.asList("dim"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + Map.class ), new AggregatorFactory[]{ new LongSumAggregatorFactory("val", "val") @@ -280,7 +294,8 @@ public class IndexTaskTest Granularity.HOUR, QueryGranularity.HOUR, Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z")) - ) + ), + jsonMapper ), new IndexTask.IndexIOConfig( new LocalFirehoseFactory( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 9aadeecac2f..de48656ac53 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -56,7 +56,11 @@ import java.io.IOException; public class TaskSerdeTest { - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final ObjectMapper jsonMapper; + static { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); + } private final IndexSpec indexSpec = new IndexSpec(); @@ -75,7 +79,8 @@ public class TaskSerdeTest Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P2D")) - ) + ), + jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) @@ -120,7 +125,8 @@ public class TaskSerdeTest Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P2D")) - ) + ), + jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) @@ -277,7 +283,8 @@ public class TaskSerdeTest "foo", null, new AggregatorFactory[0], - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null), + jsonMapper ), new RealtimeIOConfig( new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() @@ -534,7 +541,8 @@ public class TaskSerdeTest Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")) - ) + ), + jsonMapper ), new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), null ), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 57041757799..4f2fd718e7f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -191,15 +191,15 @@ public class RemoteTaskRunnerTest { doSetup(); - TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1")); + TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper); remoteTaskRunner.run(task1); Assert.assertTrue(taskAnnounced(task1.getId())); mockWorkerRunningTask(task1); - TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2")); + TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), jsonMapper); remoteTaskRunner.run(task2); - TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3")); + TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), jsonMapper); remoteTaskRunner.run(task3); Assert.assertTrue( @@ -236,15 +236,15 @@ public class RemoteTaskRunnerTest { doSetup(); - TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1")); + TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper); remoteTaskRunner.run(task1); Assert.assertTrue(taskAnnounced(task1.getId())); mockWorkerRunningTask(task1); - TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2")); + TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), jsonMapper); remoteTaskRunner.run(task2); - TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3")); + TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), jsonMapper); remoteTaskRunner.run(task3); Assert.assertTrue(taskAnnounced(task3.getId())); mockWorkerRunningTask(task3); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 5d0f13f3bff..76a2f052ab3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -336,6 +336,7 @@ public class TaskLifecycleTest ); indexSpec = new IndexSpec(); + mapper = new DefaultObjectMapper(); if (taskStorageType.equals("HeapMemoryTaskStorage")) { ts = new HeapMemoryTaskStorage( new TaskStorageConfig(null) @@ -344,7 +345,6 @@ public class TaskLifecycleTest ); } else if (taskStorageType.equals("MetadataTaskStorage")) { testDerbyConnector = derbyConnectorRule.getConnector(); - mapper = new DefaultObjectMapper(); mapper.registerSubtypes( new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"), new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory") @@ -479,7 +479,8 @@ public class TaskLifecycleTest Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P2D")) - ) + ), + mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) @@ -536,7 +537,8 @@ public class TaskLifecycleTest Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")) - ) + ), + mapper ), new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) @@ -800,7 +802,8 @@ public class TaskLifecycleTest "test_ds", null, new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, - new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null) + new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null), + mapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( new MockFirehoseFactory(true), @@ -863,7 +866,8 @@ public class TaskLifecycleTest Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P2D")) - ) + ), + mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)), new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec) diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 5462d04499c..4c28d9bc28f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -46,7 +46,7 @@ public class TaskAnnouncementTest "theid", new TaskResource("rofl", 2), new FireDepartment( - new DataSchema("foo", null, new AggregatorFactory[0], null), + new DataSchema("foo", null, new AggregatorFactory[0], null, new DefaultObjectMapper()), new RealtimeIOConfig( new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool() { diff --git a/server/src/main/java/io/druid/segment/indexing/DataSchema.java b/server/src/main/java/io/druid/segment/indexing/DataSchema.java index a32dea5277a..b628424b7f5 100644 --- a/server/src/main/java/io/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/io/druid/segment/indexing/DataSchema.java @@ -17,8 +17,11 @@ package io.druid.segment.indexing; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.metamx.common.IAE; @@ -30,6 +33,8 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import java.util.Arrays; +import java.util.Map; import java.util.Set; /** @@ -39,29 +44,68 @@ public class DataSchema private static final Logger log = new Logger(DataSchema.class); private final String dataSource; - private final InputRowParser parser; + private final Map parser; private final AggregatorFactory[] aggregators; private final GranularitySpec granularitySpec; + private final ObjectMapper jsonMapper; + @JsonCreator public DataSchema( @JsonProperty("dataSource") String dataSource, - @JsonProperty("parser") InputRowParser parser, + @JsonProperty("parser") Map parser, @JsonProperty("metricsSpec") AggregatorFactory[] aggregators, - @JsonProperty("granularitySpec") GranularitySpec granularitySpec + @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JacksonInject ObjectMapper jsonMapper ) { - Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource."); + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper."); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource."); + this.parser = parser; - this.dataSource = dataSource; + if (aggregators.length == 0) { + log.warn("No metricsSpec has been specified. Are you sure this is what you want?"); + } + this.aggregators = aggregators; + + if (granularitySpec == null) { + log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default."); + this.granularitySpec = new UniformGranularitySpec(null, null, null); + } else { + this.granularitySpec = granularitySpec; + } + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty("parser") + public Map getParserMap() + { + return parser; + } + + @JsonIgnore + public InputRowParser getParser() + { + if(parser == null) { + log.warn("No parser has been specified"); + return null; + } + + final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class); final Set dimensionExclusions = Sets.newHashSet(); for (AggregatorFactory aggregator : aggregators) { dimensionExclusions.addAll(aggregator.requiredFields()); } - if (parser != null && parser.getParseSpec() != null) { - final DimensionsSpec dimensionsSpec = parser.getParseSpec().getDimensionsSpec(); - final TimestampSpec timestampSpec = parser.getParseSpec().getTimestampSpec(); + + if (inputRowParser.getParseSpec() != null) { + final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec(); + final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec(); // exclude timestamp from dimensions by default, unless explicitly included in the list of dimensions if (timestampSpec != null) { @@ -84,8 +128,8 @@ public class DataSchema ); } - this.parser = parser.withParseSpec( - parser.getParseSpec() + return inputRowParser.withParseSpec( + inputRowParser.getParseSpec() .withDimensionsSpec( dimensionsSpec .withDimensionExclusions( @@ -94,37 +138,12 @@ public class DataSchema ) ); } else { - this.parser = parser; + return inputRowParser; } } else { - log.warn("No parser or parseSpec has been specified"); - - this.parser = parser; + log.warn("No parseSpec in parser has been specified."); + return inputRowParser; } - - if (aggregators.length == 0) { - log.warn("No metricsSpec has been specified. Are you sure this is what you want?"); - } - this.aggregators = aggregators; - - if (granularitySpec == null) { - log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default."); - this.granularitySpec = new UniformGranularitySpec(null, null, null); - } else { - this.granularitySpec = granularitySpec; - } - } - - @JsonProperty - public String getDataSource() - { - return dataSource; - } - - @JsonProperty - public InputRowParser getParser() - { - return parser; } @JsonProperty("metricsSpec") @@ -141,6 +160,53 @@ public class DataSchema public DataSchema withGranularitySpec(GranularitySpec granularitySpec) { - return new DataSchema(dataSource, parser, aggregators, granularitySpec); + return new DataSchema(dataSource, parser, aggregators, granularitySpec, jsonMapper); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DataSchema that = (DataSchema) o; + + if (!dataSource.equals(that.dataSource)) { + return false; + } + if (parser != null ? !parser.equals(that.parser) : that.parser != null) { + return false; + } + // Probably incorrect - comparing Object[] arrays with Arrays.equals + if (!Arrays.equals(aggregators, that.aggregators)) { + return false; + } + return granularitySpec.equals(that.granularitySpec); + + } + + @Override + public int hashCode() + { + int result = dataSource.hashCode(); + result = 31 * result + (parser != null ? parser.hashCode() : 0); + result = 31 * result + Arrays.hashCode(aggregators); + result = 31 * result + granularitySpec.hashCode(); + return result; + } + + @Override + public String toString() + { + return "DataSchema{" + + "dataSource='" + dataSource + '\'' + + ", parser=" + parser + + ", aggregators=" + Arrays.toString(aggregators) + + ", granularitySpec=" + granularitySpec + + '}'; } } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index c6f139f51aa..724c994d604 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -110,4 +110,33 @@ public class ArbitraryGranularitySpec implements GranularitySpec { return queryGranularity; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o; + + if (!intervals.equals(that.intervals)) { + return false; + } + return !(queryGranularity != null + ? !queryGranularity.equals(that.queryGranularity) + : that.queryGranularity != null); + + } + + @Override + public int hashCode() + { + int result = intervals.hashCode(); + result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); + return result; + } } diff --git a/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java index ae43d852df9..219d86b649f 100644 --- a/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/io/druid/segment/indexing/DataSchemaTest.java @@ -17,6 +17,8 @@ package io.druid.segment.indexing; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.metamx.common.IAE; @@ -25,6 +27,7 @@ import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; @@ -32,24 +35,39 @@ import org.junit.Assert; import org.joda.time.Interval; import org.junit.Test; +import java.util.Map; + public class DataSchemaTest { + private final ObjectMapper jsonMapper; + + public DataSchemaTest() + { + jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); + } + @Test public void testDefaultExclusions() throws Exception { - DataSchema schema = new DataSchema( - "test", + Map parser = jsonMapper.convertValue( new StringInputRowParser( new JSONParseSpec( new TimestampSpec("time", "auto", null), new DimensionsSpec(ImmutableList.of("dimB", "dimA"), null, null) ) - ), + ), Map.class + ); + + DataSchema schema = new DataSchema( + "test", + parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2"), }, - new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))) + new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))), + jsonMapper ); Assert.assertEquals( @@ -61,19 +79,24 @@ public class DataSchemaTest @Test public void testExplicitInclude() throws Exception { - DataSchema schema = new DataSchema( - "test", + Map parser = jsonMapper.convertValue( new StringInputRowParser( new JSONParseSpec( new TimestampSpec("time", "auto", null), new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "col2"), ImmutableList.of("dimC"), null) ) - ), + ), Map.class + ); + + DataSchema schema = new DataSchema( + "test", + parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2"), }, - new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))) + new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))), + jsonMapper ); Assert.assertEquals( @@ -85,19 +108,101 @@ public class DataSchemaTest @Test(expected = IAE.class) public void testOverlapMetricNameAndDim() throws Exception { - DataSchema schema = new DataSchema( - "test", + Map parser = jsonMapper.convertValue( new StringInputRowParser( new JSONParseSpec( new TimestampSpec("time", "auto", null), new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "metric1"), ImmutableList.of("dimC"), null) ) - ), + ), Map.class + ); + + DataSchema schema = new DataSchema( + "test", + parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), new DoubleSumAggregatorFactory("metric2", "col2"), }, - new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))) + new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))), + jsonMapper + ); + schema.getParser(); + } + + @Test + public void testSerdeWithInvalidParserMap() throws Exception + { + String jsonStr = "{" + + "\"dataSource\":\"test\"," + + "\"parser\":{\"type\":\"invalid\"}," + + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + + "\"granularitySpec\":{" + + "\"type\":\"arbitrary\"," + + "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"}," + + "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}"; + + + //no error on serde as parser is converted to InputRowParser lazily when really needed + DataSchema schema = jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue(jsonStr, DataSchema.class) + ), + DataSchema.class + ); + + try { + schema.getParser(); + Assert.fail("should've failed to get parser."); + } + catch (IllegalArgumentException ex) { + + } + } + + @Test + public void testSerde() throws Exception + { + String jsonStr = "{" + + "\"dataSource\":\"test\"," + + "\"parser\":{" + + "\"type\":\"string\"," + + "\"parseSpec\":{" + + "\"format\":\"json\"," + + "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null}," + + "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[], \"spatialDimensions\":[]}}" + + "}," + + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + + "\"granularitySpec\":{" + + "\"type\":\"arbitrary\"," + + "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"}," + + "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}"; + + DataSchema actual = jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue(jsonStr, DataSchema.class) + ), + DataSchema.class + ); + + Assert.assertEquals( + new DataSchema( + "test", + jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("xXx", null, null), + new DimensionsSpec(null, null, null) + ) + ), Map.class + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("metric1", "col1") + }, + new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))), + jsonMapper + ), + actual ); } } diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 43631175df7..ebdc7aca3e5 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -17,6 +17,7 @@ package io.druid.segment.realtime; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.Granularity; import io.druid.data.input.impl.DimensionsSpec; @@ -36,6 +37,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.Map; /** */ @@ -45,28 +47,33 @@ public class FireDepartmentTest public void testSerde() throws Exception { ObjectMapper jsonMapper = new DefaultObjectMapper(); + jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); FireDepartment schema = new FireDepartment( new DataSchema( "foo", - new StringInputRowParser( - new JSONParseSpec( - new TimestampSpec( - "timestamp", - "auto", - null - ), - new DimensionsSpec( - Arrays.asList("dim1", "dim2"), - null, - null + jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), + new DimensionsSpec( + Arrays.asList("dim1", "dim2"), + null, + null + ) ) - ) + ), + Map.class ), new AggregatorFactory[]{ new CountAggregatorFactory("count") }, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null), + jsonMapper ), new RealtimeIOConfig( null, diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index a3bbc0ffb45..2bb2c2ea8fa 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.aggregation.AggregatorFactory; @@ -81,17 +83,21 @@ public class RealtimeManagerTest makeRow(new DateTime().getMillis()) ); + ObjectMapper jsonMapper = new DefaultObjectMapper(); + schema = new DataSchema( "test", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null), + jsonMapper ); schema2 = new DataSchema( "testV2", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null), + jsonMapper ); RealtimeIOConfig ioConfig = new RealtimeIOConfig( new FirehoseFactory() diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 0cc211c7a30..5f77f3d4907 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.plumber; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Predicate; import com.google.common.base.Suppliers; import com.google.common.collect.Maps; @@ -34,8 +35,10 @@ import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; @@ -66,6 +69,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -112,33 +116,22 @@ public class RealtimePlumberSchoolTest final File tmpDir = Files.createTempDir(); tmpDir.deleteOnExit(); + ObjectMapper jsonMapper = new DefaultObjectMapper(); + schema = new DataSchema( "test", - new InputRowParser() - { - @Override - public InputRow parse(Object input) - { - return null; - } - - @Override - public ParseSpec getParseSpec() - { - return new JSONParseSpec( - new TimestampSpec("timestamp", "auto", null), - new DimensionsSpec(null, null, null) - ); - } - - @Override - public InputRowParser withParseSpec(ParseSpec parseSpec) - { - return null; - } - }, + jsonMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec(null, null, null) + ) + ), + Map.class + ), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null), + jsonMapper ); announcer = EasyMock.createMock(DataSegmentAnnouncer.class); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index d46dbc82922..925b41baa01 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -23,6 +23,7 @@ import com.metamx.common.Granularity; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -48,7 +49,8 @@ public class SinkTest "test", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, - new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null) + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null), + new DefaultObjectMapper() ); final Interval interval = new Interval("2013-01-01/2013-01-02"); From e8b9ee85a75368b0d91533964f1a947f3637a058 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 31 Aug 2015 22:27:34 -0500 Subject: [PATCH 2/2] HadoopyStringInputRowParser to convert stringy Text, BytesWritable etc into InputRow --- .../indexer/HadoopDruidIndexerConfig.java | 3 +- .../indexer/HadoopDruidIndexerMapper.java | 6 +- .../indexer/HadoopyStringInputRowParser.java | 69 +++++++++++++++++++ .../druid/indexer/IndexingHadoopModule.java | 50 ++++++++++++++ .../HadoopyStringInputRowParserTest.java | 51 ++++++++++++++ .../druid/indexer/IndexGeneratorJobTest.java | 61 +++++++++++----- 6 files changed, 217 insertions(+), 23 deletions(-) create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/HadoopyStringInputRowParser.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/IndexingHadoopModule.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/HadoopyStringInputRowParserTest.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 3a5409453fe..4a6807c92aa 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -103,7 +103,8 @@ public class HadoopDruidIndexerConfig binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-indexer", null, null) ); } - } + }, + new IndexingHadoopModule() ) ); jsonMapper = injector.getInstance(ObjectMapper.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 18d239b734e..e644288df7e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -104,11 +104,9 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< { if (parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before + //HadoopyStringInputRowParser can handle this and this special case is not needed + //except for backward compatibility return ((StringInputRowParser) parser).parse(value.toString()); - } else if (parser instanceof StringInputRowParser && value instanceof BytesWritable) { - BytesWritable valueBytes = (BytesWritable) value; - ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength()); - return ((StringInputRowParser) parser).parse(valueBuffer); } else if (value instanceof InputRow) { return (InputRow) value; } else { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopyStringInputRowParser.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopyStringInputRowParser.java new file mode 100644 index 00000000000..96d4528aaf0 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopyStringInputRowParser.java @@ -0,0 +1,69 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.IAE; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.StringInputRowParser; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; + +import java.nio.ByteBuffer; + +/** + */ +public class HadoopyStringInputRowParser implements InputRowParser +{ + private final StringInputRowParser parser; + + public HadoopyStringInputRowParser(@JsonProperty("parseSpec") ParseSpec parseSpec) + { + this.parser = new StringInputRowParser(parseSpec); + } + + @Override + public InputRow parse(Object input) + { + if (input instanceof Text) { + return parser.parse(((Text) input).toString()); + } else if (input instanceof BytesWritable) { + BytesWritable valueBytes = (BytesWritable) input; + return parser.parse(ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength())); + } else { + throw new IAE("can't convert type [%s] to InputRow", input.getClass().getName()); + } + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parser.getParseSpec(); + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new HadoopyStringInputRowParser(parseSpec); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexingHadoopModule.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexingHadoopModule.java new file mode 100644 index 00000000000..7209fac804b --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexingHadoopModule.java @@ -0,0 +1,50 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +/** + */ +public class IndexingHadoopModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("IndexingHadoopModule") + .registerSubtypes( + new NamedType(HadoopyStringInputRowParser.class, "hadoopyString") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopyStringInputRowParserTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopyStringInputRowParserTest.java new file mode 100644 index 00000000000..a0d034a8f21 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopyStringInputRowParserTest.java @@ -0,0 +1,51 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.data.input.impl.InputRowParser; +import org.junit.Assert; +import org.junit.Test; + + +/** + */ +public class HadoopyStringInputRowParserTest +{ + @Test + public void testSerde() throws Exception + { + String jsonStr = "{" + + "\"type\":\"hadoopyString\"," + + "\"parseSpec\":{\"format\":\"json\",\"timestampSpec\":{\"column\":\"xXx\"},\"dimensionsSpec\":{}}" + + "}"; + + ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; + InputRowParser parser = jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue(jsonStr, InputRowParser.class) + ), + InputRowParser.class + ); + + Assert.assertTrue(parser instanceof HadoopyStringInputRowParser); + Assert.assertEquals("xXx", parser.getParseSpec().getTimestampSpec().getTimestampColumn()); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 74ba51ccd04..6edb560ae95 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -19,7 +19,6 @@ package io.druid.indexer; -import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Charsets; @@ -30,10 +29,10 @@ import com.google.common.collect.Maps; import com.metamx.common.Granularity; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; -import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -126,7 +125,15 @@ public class IndexGeneratorJobTest "2014102300,i.example.com,963", "2014102300,j.example.com,333" ), - null + null, + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ) }, { false, @@ -160,7 +167,15 @@ public class IndexGeneratorJobTest "2014102216,q.example.com,500", "2014102216,q.example.com,87" ), - null + null, + new HadoopyStringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ) }, { true, @@ -194,7 +209,15 @@ public class IndexGeneratorJobTest "2014102216,q.example.com,500", "2014102216,q.example.com,87" ), - null + null, + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ) }, { false, @@ -238,7 +261,15 @@ public class IndexGeneratorJobTest "2014102300,i.example.com,963", "2014102300,j.example.com,333" ), - SequenceFileInputFormat.class.getName() + SequenceFileInputFormat.class.getName(), + new HadoopyStringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "visited_num") + ) + ) } } ); @@ -257,6 +288,7 @@ public class IndexGeneratorJobTest private List data; private boolean useCombiner; private String inputFormatName; + private InputRowParser inputRowParser; public IndexGeneratorJobTest( boolean useCombiner, @@ -264,7 +296,8 @@ public class IndexGeneratorJobTest String interval, Object[][][] shardInfoForEachSegment, List data, - String inputFormatName + String inputFormatName, + InputRowParser inputRowParser ) throws IOException { this.useCombiner = useCombiner; @@ -273,6 +306,7 @@ public class IndexGeneratorJobTest this.interval = new Interval(interval); this.data = data; this.inputFormatName = inputFormatName; + this.inputRowParser = inputRowParser; } private void writeDataToLocalSequenceFile(File outputFile, List data) throws IOException @@ -305,11 +339,9 @@ public class IndexGeneratorJobTest @Before public void setUp() throws Exception { - mapper = new DefaultObjectMapper(); + mapper = HadoopDruidIndexerConfig.jsonMapper; mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed")); mapper.registerSubtypes(new NamedType(SingleDimensionShardSpec.class, "single")); - InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper); - mapper.setInjectableValues(inject); dataFile = temporaryFolder.newFile(); tmpDir = temporaryFolder.newFolder(); @@ -332,14 +364,7 @@ public class IndexGeneratorJobTest new DataSchema( "website", mapper.convertValue( - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec("timestamp", "yyyyMMddHH", null), - new DimensionsSpec(ImmutableList.of("host"), null, null), - null, - ImmutableList.of("timestamp", "host", "visited_num") - ) - ), + inputRowParser, Map.class ), new AggregatorFactory[]{