Lazily deserialize "parser" to InputRowParser in DataSchema

so that user hadoop related InputRowParsers are created only when needed
this allows overlord to accept a HadoopIndexTask with a hadoopy InputRowParser
and not fail because hadoopy InputRowParser might need hadoop libraries
This commit is contained in:
Himanshu Gupta 2015-08-31 22:24:10 -05:00
parent b464da438c
commit 74f4572bd4
24 changed files with 546 additions and 240 deletions

View File

@ -340,13 +340,16 @@ public class BatchDeltaIngestionTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
)
MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited_num"),
@ -354,7 +357,8 @@ public class BatchDeltaIngestionTest
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(INTERVAL_FULL)
)
),
MAPPER
),
new HadoopIOConfig(
inputSpec,

View File

@ -108,26 +108,36 @@ public class DetermineHashedPartitionsJobTest
HadoopIngestionSpec ingestionSpec = new HadoopIngestionSpec(
new DataSchema(
"test_schema",
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", null, null),
new DimensionsSpec(ImmutableList.of("market", "quality", "placement", "placementish"), null, null),
"\t",
null,
Arrays.asList("ts",
"market",
"quality",
"placement",
"placementish",
"index")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", null, null),
new DimensionsSpec(
ImmutableList.of("market", "quality", "placement", "placementish"),
null,
null
),
"\t",
null,
Arrays.asList(
"ts",
"market",
"quality",
"placement",
"placementish",
"index"
)
)
),
Map.class
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index")},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.NONE,
ImmutableList.of(new Interval(interval))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -223,18 +223,22 @@ public class DeterminePartitionsJobTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host", "country"), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host", "country"), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(new Interval(interval))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -17,6 +17,7 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -45,7 +46,11 @@ import java.util.List;
*/
public class HadoopDruidIndexerConfigTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
public static <T> T jsonReadWriteRead(String s, Class<T> klass)
{
@ -175,12 +180,17 @@ public class HadoopDruidIndexerConfigTest
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo", null, new AggregatorFactory[0], new UniformGranularitySpec(
Granularity.MINUTE,
QueryGranularity.MINUTE,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
), new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.MINUTE,
QueryGranularity.MINUTE,
ImmutableList.of(new Interval("2010-01-01/P1D"))
),
jsonMapper
),
new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),
new HadoopTuningConfig(
null,
null,

View File

@ -17,15 +17,16 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Interval;
import org.junit.Assert;
@ -33,7 +34,11 @@ import org.junit.Test;
public class HadoopIngestionSpecTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
@Test
public void testGranularitySpec()

View File

@ -19,6 +19,9 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -50,7 +53,15 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
private final String testDatasource = "test";
private final Interval testDatasourceInterval = new Interval("1970/3000");
private final Interval testDatasourceIntervalPartial = new Interval("2050/3000");
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final ObjectMapper jsonMapper;
public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)
);
}
private static final DataSegment SEGMENT = new DataSegment(
"test1",
@ -155,7 +166,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
ImmutableList.of(
new Interval("2010-01-01/P1D")
)
)
),
jsonMapper
),
new HadoopIOConfig(
jsonMapper.convertValue(datasourcePathSpec, Map.class),

View File

@ -47,6 +47,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
@ -62,13 +63,16 @@ public class IndexGeneratorCombinerTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited"),
@ -76,7 +80,8 @@ public class IndexGeneratorCombinerTest
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011"))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -331,13 +331,16 @@ public class IndexGeneratorJobTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
mapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_num", "visited_num"),
@ -345,7 +348,8 @@ public class IndexGeneratorJobTest
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
mapper
),
new HadoopIOConfig(
ImmutableMap.copyOf(inputSpec),

View File

@ -66,18 +66,22 @@ public class JobHelperTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -60,6 +60,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
@ -176,20 +177,24 @@ public class DatasourcePathSpecTest
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -156,14 +156,17 @@ public class HadoopConverterJobTest
new HadoopIngestionSpec(
new DataSchema(
DATASOURCE,
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)
)
),
Map.class
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0]),
@ -173,7 +176,8 @@ public class HadoopConverterJobTest
Granularity.MONTH,
QueryGranularity.DAY,
ImmutableList.<Interval>of(interval)
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -17,11 +17,14 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -46,14 +49,15 @@ public class TestRealtimeTask extends RealtimeIndexTask
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("taskStatus") TaskStatus status
@JsonProperty("taskStatus") TaskStatus status,
@JacksonInject ObjectMapper mapper
)
{
super(
id,
taskResource,
new FireDepartment(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), new RealtimeIOConfig(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, mapper), new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override

View File

@ -17,6 +17,7 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
@ -56,12 +57,14 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class IndexTaskTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private final IndexSpec indexSpec = new IndexSpec();
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testDeterminePartitions() throws Exception
@ -82,21 +85,24 @@ public class IndexTaskTest
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
@ -105,7 +111,8 @@ public class IndexTaskTest
Granularity.DAY,
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
@ -149,21 +156,24 @@ public class IndexTaskTest
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
@ -171,7 +181,8 @@ public class IndexTaskTest
new ArbitraryGranularitySpec(
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
@ -257,21 +268,24 @@ public class IndexTaskTest
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
@ -280,7 +294,8 @@ public class IndexTaskTest
Granularity.HOUR,
QueryGranularity.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(

View File

@ -56,7 +56,11 @@ import java.io.IOException;
public class TaskSerdeTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
private final IndexSpec indexSpec = new IndexSpec();
@ -75,7 +79,8 @@ public class TaskSerdeTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -120,7 +125,8 @@ public class TaskSerdeTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -277,7 +283,8 @@ public class TaskSerdeTest
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
@ -534,7 +541,8 @@ public class TaskSerdeTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
),
jsonMapper
), new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar"), null, null), null
),
null,

View File

@ -191,15 +191,15 @@ public class RemoteTaskRunnerTest
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"));
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), jsonMapper);
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"));
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), jsonMapper);
remoteTaskRunner.run(task3);
Assert.assertTrue(
@ -236,15 +236,15 @@ public class RemoteTaskRunnerTest
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"));
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), jsonMapper);
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"));
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), jsonMapper);
remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);

View File

@ -336,6 +336,7 @@ public class TaskLifecycleTest
);
indexSpec = new IndexSpec();
mapper = new DefaultObjectMapper();
if (taskStorageType.equals("HeapMemoryTaskStorage")) {
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
@ -344,7 +345,6 @@ public class TaskLifecycleTest
);
} else if (taskStorageType.equals("MetadataTaskStorage")) {
testDerbyConnector = derbyConnectorRule.getConnector();
mapper = new DefaultObjectMapper();
mapper.registerSubtypes(
new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"),
new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory")
@ -479,7 +479,8 @@ public class TaskLifecycleTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -536,7 +537,8 @@ public class TaskLifecycleTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
),
mapper
),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -800,7 +802,8 @@ public class TaskLifecycleTest
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
mapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
@ -863,7 +866,8 @@ public class TaskLifecycleTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)

View File

@ -46,7 +46,7 @@ public class TaskAnnouncementTest
"theid",
new TaskResource("rofl", 2),
new FireDepartment(
new DataSchema("foo", null, new AggregatorFactory[0], null),
new DataSchema("foo", null, new AggregatorFactory[0], null, new DefaultObjectMapper()),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{

View File

@ -17,8 +17,11 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.metamx.common.IAE;
@ -30,6 +33,8 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
/**
@ -39,29 +44,68 @@ public class DataSchema
private static final Logger log = new Logger(DataSchema.class);
private final String dataSource;
private final InputRowParser parser;
private final Map<String, Object> parser;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
private final ObjectMapper jsonMapper;
@JsonCreator
public DataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("parser") InputRowParser parser,
@JsonProperty("parser") Map<String, Object> parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JacksonInject ObjectMapper jsonMapper
)
{
Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource.");
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper.");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource.");
this.parser = parser;
this.dataSource = dataSource;
if (aggregators.length == 0) {
log.warn("No metricsSpec has been specified. Are you sure this is what you want?");
}
this.aggregators = aggregators;
if (granularitySpec == null) {
log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default.");
this.granularitySpec = new UniformGranularitySpec(null, null, null);
} else {
this.granularitySpec = granularitySpec;
}
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty("parser")
public Map<String, Object> getParserMap()
{
return parser;
}
@JsonIgnore
public InputRowParser getParser()
{
if(parser == null) {
log.warn("No parser has been specified");
return null;
}
final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class);
final Set<String> dimensionExclusions = Sets.newHashSet();
for (AggregatorFactory aggregator : aggregators) {
dimensionExclusions.addAll(aggregator.requiredFields());
}
if (parser != null && parser.getParseSpec() != null) {
final DimensionsSpec dimensionsSpec = parser.getParseSpec().getDimensionsSpec();
final TimestampSpec timestampSpec = parser.getParseSpec().getTimestampSpec();
if (inputRowParser.getParseSpec() != null) {
final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec();
final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec();
// exclude timestamp from dimensions by default, unless explicitly included in the list of dimensions
if (timestampSpec != null) {
@ -84,8 +128,8 @@ public class DataSchema
);
}
this.parser = parser.withParseSpec(
parser.getParseSpec()
return inputRowParser.withParseSpec(
inputRowParser.getParseSpec()
.withDimensionsSpec(
dimensionsSpec
.withDimensionExclusions(
@ -94,37 +138,12 @@ public class DataSchema
)
);
} else {
this.parser = parser;
return inputRowParser;
}
} else {
log.warn("No parser or parseSpec has been specified");
this.parser = parser;
log.warn("No parseSpec in parser has been specified.");
return inputRowParser;
}
if (aggregators.length == 0) {
log.warn("No metricsSpec has been specified. Are you sure this is what you want?");
}
this.aggregators = aggregators;
if (granularitySpec == null) {
log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default.");
this.granularitySpec = new UniformGranularitySpec(null, null, null);
} else {
this.granularitySpec = granularitySpec;
}
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public InputRowParser getParser()
{
return parser;
}
@JsonProperty("metricsSpec")
@ -141,6 +160,53 @@ public class DataSchema
public DataSchema withGranularitySpec(GranularitySpec granularitySpec)
{
return new DataSchema(dataSource, parser, aggregators, granularitySpec);
return new DataSchema(dataSource, parser, aggregators, granularitySpec, jsonMapper);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataSchema that = (DataSchema) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
if (parser != null ? !parser.equals(that.parser) : that.parser != null) {
return false;
}
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(aggregators, that.aggregators)) {
return false;
}
return granularitySpec.equals(that.granularitySpec);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + (parser != null ? parser.hashCode() : 0);
result = 31 * result + Arrays.hashCode(aggregators);
result = 31 * result + granularitySpec.hashCode();
return result;
}
@Override
public String toString()
{
return "DataSchema{" +
"dataSource='" + dataSource + '\'' +
", parser=" + parser +
", aggregators=" + Arrays.toString(aggregators) +
", granularitySpec=" + granularitySpec +
'}';
}
}

View File

@ -110,4 +110,33 @@ public class ArbitraryGranularitySpec implements GranularitySpec
{
return queryGranularity;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o;
if (!intervals.equals(that.intervals)) {
return false;
}
return !(queryGranularity != null
? !queryGranularity.equals(that.queryGranularity)
: that.queryGranularity != null);
}
@Override
public int hashCode()
{
int result = intervals.hashCode();
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
return result;
}
}

View File

@ -17,6 +17,8 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.IAE;
@ -25,6 +27,7 @@ import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
@ -32,24 +35,39 @@ import org.junit.Assert;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.Map;
public class DataSchemaTest
{
private final ObjectMapper jsonMapper;
public DataSchemaTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
@Test
public void testDefaultExclusions() throws Exception
{
DataSchema schema = new DataSchema(
"test",
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("dimB", "dimA"), null, null)
)
),
), Map.class
);
DataSchema schema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
);
Assert.assertEquals(
@ -61,19 +79,24 @@ public class DataSchemaTest
@Test
public void testExplicitInclude() throws Exception
{
DataSchema schema = new DataSchema(
"test",
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "col2"), ImmutableList.of("dimC"), null)
)
),
), Map.class
);
DataSchema schema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
);
Assert.assertEquals(
@ -85,19 +108,101 @@ public class DataSchemaTest
@Test(expected = IAE.class)
public void testOverlapMetricNameAndDim() throws Exception
{
DataSchema schema = new DataSchema(
"test",
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "metric1"), ImmutableList.of("dimC"), null)
)
),
), Map.class
);
DataSchema schema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
);
schema.getParser();
}
@Test
public void testSerdeWithInvalidParserMap() throws Exception
{
String jsonStr = "{"
+ "\"dataSource\":\"test\","
+ "\"parser\":{\"type\":\"invalid\"},"
+ "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
+ "\"granularitySpec\":{"
+ "\"type\":\"arbitrary\","
+ "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"},"
+ "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}";
//no error on serde as parser is converted to InputRowParser lazily when really needed
DataSchema schema = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, DataSchema.class)
),
DataSchema.class
);
try {
schema.getParser();
Assert.fail("should've failed to get parser.");
}
catch (IllegalArgumentException ex) {
}
}
@Test
public void testSerde() throws Exception
{
String jsonStr = "{"
+ "\"dataSource\":\"test\","
+ "\"parser\":{"
+ "\"type\":\"string\","
+ "\"parseSpec\":{"
+ "\"format\":\"json\","
+ "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null},"
+ "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[], \"spatialDimensions\":[]}}"
+ "},"
+ "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
+ "\"granularitySpec\":{"
+ "\"type\":\"arbitrary\","
+ "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"},"
+ "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}";
DataSchema actual = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, DataSchema.class)
),
DataSchema.class
);
Assert.assertEquals(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("xXx", null, null),
new DimensionsSpec(null, null, null)
)
), Map.class
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1")
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
),
actual
);
}
}

View File

@ -17,6 +17,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DimensionsSpec;
@ -36,6 +37,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
/**
*/
@ -45,28 +47,33 @@ public class FireDepartmentTest
public void testSerde() throws Exception
{
ObjectMapper jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
FireDepartment schema = new FireDepartment(
new DataSchema(
"foo",
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim1", "dim2"),
null,
null
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim1", "dim2"),
null,
null
)
)
)
),
Map.class
),
new AggregatorFactory[]{
new CountAggregatorFactory("count")
},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null),
jsonMapper
),
new RealtimeIOConfig(
null,

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@ -34,6 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
@ -81,17 +83,21 @@ public class RealtimeManagerTest
makeRow(new DateTime().getMillis())
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
schema = new DataSchema(
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
);
schema2 = new DataSchema(
"testV2",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
@ -34,8 +35,10 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
@ -66,6 +69,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -112,33 +116,22 @@ public class RealtimePlumberSchoolTest
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
ObjectMapper jsonMapper = new DefaultObjectMapper();
schema = new DataSchema(
"test",
new InputRowParser()
{
@Override
public InputRow parse(Object input)
{
return null;
}
@Override
public ParseSpec getParseSpec()
{
return new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
);
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return null;
}
},
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
)
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);

View File

@ -23,6 +23,7 @@ import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
@ -48,7 +49,8 @@ public class SinkTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null),
new DefaultObjectMapper()
);
final Interval interval = new Interval("2013-01-01/2013-01-02");