Merge pull request #1695 from himanshug/allow_hadoop_based_input_row_parser

Allow writing InputRowParser extensions that use hadoop/any libraries
This commit is contained in:
Charles Allen 2015-09-16 10:16:18 -07:00
commit 42bd4f6049
29 changed files with 755 additions and 255 deletions

View File

@ -103,7 +103,8 @@ public class HadoopDruidIndexerConfig
binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-indexer", null, null)
);
}
}
},
new IndexingHadoopModule()
)
);
jsonMapper = injector.getInstance(ObjectMapper.class);

View File

@ -104,11 +104,9 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
{
if (parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
//HadoopyStringInputRowParser can handle this and this special case is not needed
//except for backward compatibility
return ((StringInputRowParser) parser).parse(value.toString());
} else if (parser instanceof StringInputRowParser && value instanceof BytesWritable) {
BytesWritable valueBytes = (BytesWritable) value;
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength());
return ((StringInputRowParser) parser).parse(valueBuffer);
} else if (value instanceof InputRow) {
return (InputRow) value;
} else {

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.IAE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import java.nio.ByteBuffer;
/**
*/
public class HadoopyStringInputRowParser implements InputRowParser<Object>
{
private final StringInputRowParser parser;
public HadoopyStringInputRowParser(@JsonProperty("parseSpec") ParseSpec parseSpec)
{
this.parser = new StringInputRowParser(parseSpec);
}
@Override
public InputRow parse(Object input)
{
if (input instanceof Text) {
return parser.parse(((Text) input).toString());
} else if (input instanceof BytesWritable) {
BytesWritable valueBytes = (BytesWritable) input;
return parser.parse(ByteBuffer.wrap(valueBytes.getBytes(), 0, valueBytes.getLength()));
} else {
throw new IAE("can't convert type [%s] to InputRow", input.getClass().getName());
}
}
@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parser.getParseSpec();
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new HadoopyStringInputRowParser(parseSpec);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
/**
*/
public class IndexingHadoopModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("IndexingHadoopModule")
.registerSubtypes(
new NamedType(HadoopyStringInputRowParser.class, "hadoopyString")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -340,13 +340,16 @@ public class BatchDeltaIngestionTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
)
MAPPER.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited_num"),
@ -354,7 +357,8 @@ public class BatchDeltaIngestionTest
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(INTERVAL_FULL)
)
),
MAPPER
),
new HadoopIOConfig(
inputSpec,

View File

@ -108,26 +108,36 @@ public class DetermineHashedPartitionsJobTest
HadoopIngestionSpec ingestionSpec = new HadoopIngestionSpec(
new DataSchema(
"test_schema",
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", null, null),
new DimensionsSpec(ImmutableList.of("market", "quality", "placement", "placementish"), null, null),
"\t",
null,
Arrays.asList("ts",
"market",
"quality",
"placement",
"placementish",
"index")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", null, null),
new DimensionsSpec(
ImmutableList.of("market", "quality", "placement", "placementish"),
null,
null
),
"\t",
null,
Arrays.asList(
"ts",
"market",
"quality",
"placement",
"placementish",
"index"
)
)
),
Map.class
),
new AggregatorFactory[]{new DoubleSumAggregatorFactory("index", "index")},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.NONE,
ImmutableList.of(new Interval(interval))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -223,18 +223,22 @@ public class DeterminePartitionsJobTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host", "country"), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host", "country"), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(new Interval(interval))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -17,6 +17,7 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@ -45,7 +46,11 @@ import java.util.List;
*/
public class HadoopDruidIndexerConfigTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
public static <T> T jsonReadWriteRead(String s, Class<T> klass)
{
@ -175,12 +180,17 @@ public class HadoopDruidIndexerConfigTest
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo", null, new AggregatorFactory[0], new UniformGranularitySpec(
Granularity.MINUTE,
QueryGranularity.MINUTE,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
), new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.MINUTE,
QueryGranularity.MINUTE,
ImmutableList.of(new Interval("2010-01-01/P1D"))
),
jsonMapper
),
new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),
new HadoopTuningConfig(
null,
null,

View File

@ -17,15 +17,16 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Interval;
import org.junit.Assert;
@ -33,7 +34,11 @@ import org.junit.Test;
public class HadoopIngestionSpecTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
@Test
public void testGranularitySpec()

View File

@ -19,6 +19,9 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -50,7 +53,15 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
private final String testDatasource = "test";
private final Interval testDatasourceInterval = new Interval("1970/3000");
private final Interval testDatasourceIntervalPartial = new Interval("2050/3000");
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final ObjectMapper jsonMapper;
public HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)
);
}
private static final DataSegment SEGMENT = new DataSegment(
"test1",
@ -155,7 +166,8 @@ public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
ImmutableList.of(
new Interval("2010-01-01/P1D")
)
)
),
jsonMapper
),
new HadoopIOConfig(
jsonMapper.convertValue(datasourcePathSpec, Map.class),

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.data.input.impl.InputRowParser;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class HadoopyStringInputRowParserTest
{
@Test
public void testSerde() throws Exception
{
String jsonStr = "{"
+ "\"type\":\"hadoopyString\","
+ "\"parseSpec\":{\"format\":\"json\",\"timestampSpec\":{\"column\":\"xXx\"},\"dimensionsSpec\":{}}"
+ "}";
ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
InputRowParser parser = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, InputRowParser.class)
),
InputRowParser.class
);
Assert.assertTrue(parser instanceof HadoopyStringInputRowParser);
Assert.assertEquals("xXx", parser.getParseSpec().getTimestampSpec().getTimestampColumn());
}
}

View File

@ -47,6 +47,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
@ -62,13 +63,16 @@ public class IndexGeneratorCombinerTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited"),
@ -76,7 +80,8 @@ public class IndexGeneratorCombinerTest
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2010/2011"))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -19,7 +19,6 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Charsets;
@ -30,10 +29,10 @@ import com.google.common.collect.Maps;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -126,7 +125,15 @@ public class IndexGeneratorJobTest
"2014102300,i.example.com,963",
"2014102300,j.example.com,333"
),
null
null,
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
},
{
false,
@ -160,7 +167,15 @@ public class IndexGeneratorJobTest
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
),
null
null,
new HadoopyStringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
},
{
true,
@ -194,7 +209,15 @@ public class IndexGeneratorJobTest
"2014102216,q.example.com,500",
"2014102216,q.example.com,87"
),
null
null,
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
},
{
false,
@ -238,7 +261,15 @@ public class IndexGeneratorJobTest
"2014102300,i.example.com,963",
"2014102300,j.example.com,333"
),
SequenceFileInputFormat.class.getName()
SequenceFileInputFormat.class.getName(),
new HadoopyStringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
}
}
);
@ -257,6 +288,7 @@ public class IndexGeneratorJobTest
private List<String> data;
private boolean useCombiner;
private String inputFormatName;
private InputRowParser inputRowParser;
public IndexGeneratorJobTest(
boolean useCombiner,
@ -264,7 +296,8 @@ public class IndexGeneratorJobTest
String interval,
Object[][][] shardInfoForEachSegment,
List<String> data,
String inputFormatName
String inputFormatName,
InputRowParser inputRowParser
) throws IOException
{
this.useCombiner = useCombiner;
@ -273,6 +306,7 @@ public class IndexGeneratorJobTest
this.interval = new Interval(interval);
this.data = data;
this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser;
}
private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
@ -305,11 +339,9 @@ public class IndexGeneratorJobTest
@Before
public void setUp() throws Exception
{
mapper = new DefaultObjectMapper();
mapper = HadoopDruidIndexerConfig.jsonMapper;
mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
mapper.registerSubtypes(new NamedType(SingleDimensionShardSpec.class, "single"));
InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper);
mapper.setInjectableValues(inject);
dataFile = temporaryFolder.newFile();
tmpDir = temporaryFolder.newFolder();
@ -331,13 +363,9 @@ public class IndexGeneratorJobTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
mapper.convertValue(
inputRowParser,
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_num", "visited_num"),
@ -345,7 +373,8 @@ public class IndexGeneratorJobTest
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
mapper
),
new HadoopIOConfig(
ImmutableMap.copyOf(inputSpec),

View File

@ -66,18 +66,22 @@ public class JobHelperTest
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
),
Map.class
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -60,6 +60,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
@ -176,20 +177,24 @@ public class DatasourcePathSpecTest
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -156,14 +156,17 @@ public class HadoopConverterJobTest
new HadoopIngestionSpec(
new DataSchema(
DATASOURCE,
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)
)
HadoopDruidIndexerConfig.jsonMapper.convertValue(
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)
)
),
Map.class
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory(TestIndex.METRICS[0], TestIndex.METRICS[0]),
@ -173,7 +176,8 @@ public class HadoopConverterJobTest
Granularity.MONTH,
QueryGranularity.DAY,
ImmutableList.<Interval>of(interval)
)
),
HadoopDruidIndexerConfig.jsonMapper
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(

View File

@ -17,11 +17,14 @@
package io.druid.indexing.common;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -46,14 +49,15 @@ public class TestRealtimeTask extends RealtimeIndexTask
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("taskStatus") TaskStatus status
@JsonProperty("taskStatus") TaskStatus status,
@JacksonInject ObjectMapper mapper
)
{
super(
id,
taskResource,
new FireDepartment(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), new RealtimeIOConfig(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null, mapper), new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{
@Override

View File

@ -17,6 +17,7 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
@ -56,12 +57,14 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class IndexTaskTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private final IndexSpec indexSpec = new IndexSpec();
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testDeterminePartitions() throws Exception
@ -82,21 +85,24 @@ public class IndexTaskTest
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
@ -105,7 +111,8 @@ public class IndexTaskTest
Granularity.DAY,
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
@ -149,21 +156,24 @@ public class IndexTaskTest
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
@ -171,7 +181,8 @@ public class IndexTaskTest
new ArbitraryGranularitySpec(
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
@ -257,21 +268,24 @@ public class IndexTaskTest
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
@ -280,7 +294,8 @@ public class IndexTaskTest
Granularity.HOUR,
QueryGranularity.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(

View File

@ -56,7 +56,11 @@ import java.io.IOException;
public class TaskSerdeTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final ObjectMapper jsonMapper;
static {
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
private final IndexSpec indexSpec = new IndexSpec();
@ -75,7 +79,8 @@ public class TaskSerdeTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -120,7 +125,8 @@ public class TaskSerdeTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -277,7 +283,8 @@ public class TaskSerdeTest
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
@ -534,7 +541,8 @@ public class TaskSerdeTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
),
jsonMapper
), new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar"), null, null), null
),
null,

View File

@ -191,15 +191,15 @@ public class RemoteTaskRunnerTest
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"));
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"), jsonMapper);
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"));
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"), jsonMapper);
remoteTaskRunner.run(task3);
Assert.assertTrue(
@ -236,15 +236,15 @@ public class RemoteTaskRunnerTest
{
doSetup();
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"));
TestRealtimeTask task1 = new TestRealtimeTask("rt1", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt1"), jsonMapper);
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"));
TestRealtimeTask task2 = new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"), jsonMapper);
remoteTaskRunner.run(task2);
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"));
TestRealtimeTask task3 = new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"), jsonMapper);
remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);

View File

@ -336,6 +336,7 @@ public class TaskLifecycleTest
);
indexSpec = new IndexSpec();
mapper = new DefaultObjectMapper();
if (taskStorageType.equals("HeapMemoryTaskStorage")) {
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
@ -344,7 +345,6 @@ public class TaskLifecycleTest
);
} else if (taskStorageType.equals("MetadataTaskStorage")) {
testDerbyConnector = derbyConnectorRule.getConnector();
mapper = new DefaultObjectMapper();
mapper.registerSubtypes(
new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"),
new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory")
@ -479,7 +479,8 @@ public class TaskLifecycleTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -536,7 +537,8 @@ public class TaskLifecycleTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P1D"))
)
),
mapper
),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)
@ -800,7 +802,8 @@ public class TaskLifecycleTest
"test_ds",
null,
new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")},
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
mapper
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
@ -863,7 +866,8 @@ public class TaskLifecycleTest
Granularity.DAY,
null,
ImmutableList.of(new Interval("2010-01-01/P2D"))
)
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec)

View File

@ -46,7 +46,7 @@ public class TaskAnnouncementTest
"theid",
new TaskResource("rofl", 2),
new FireDepartment(
new DataSchema("foo", null, new AggregatorFactory[0], null),
new DataSchema("foo", null, new AggregatorFactory[0], null, new DefaultObjectMapper()),
new RealtimeIOConfig(
new LocalFirehoseFactory(new File("lol"), "rofl", null), new PlumberSchool()
{

View File

@ -17,8 +17,11 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.metamx.common.IAE;
@ -30,6 +33,8 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
/**
@ -39,29 +44,68 @@ public class DataSchema
private static final Logger log = new Logger(DataSchema.class);
private final String dataSource;
private final InputRowParser parser;
private final Map<String, Object> parser;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
private final ObjectMapper jsonMapper;
@JsonCreator
public DataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("parser") InputRowParser parser,
@JsonProperty("parser") Map<String, Object> parser,
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JacksonInject ObjectMapper jsonMapper
)
{
Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource.");
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper.");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource.");
this.parser = parser;
this.dataSource = dataSource;
if (aggregators.length == 0) {
log.warn("No metricsSpec has been specified. Are you sure this is what you want?");
}
this.aggregators = aggregators;
if (granularitySpec == null) {
log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default.");
this.granularitySpec = new UniformGranularitySpec(null, null, null);
} else {
this.granularitySpec = granularitySpec;
}
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty("parser")
public Map<String, Object> getParserMap()
{
return parser;
}
@JsonIgnore
public InputRowParser getParser()
{
if(parser == null) {
log.warn("No parser has been specified");
return null;
}
final InputRowParser inputRowParser = jsonMapper.convertValue(this.parser, InputRowParser.class);
final Set<String> dimensionExclusions = Sets.newHashSet();
for (AggregatorFactory aggregator : aggregators) {
dimensionExclusions.addAll(aggregator.requiredFields());
}
if (parser != null && parser.getParseSpec() != null) {
final DimensionsSpec dimensionsSpec = parser.getParseSpec().getDimensionsSpec();
final TimestampSpec timestampSpec = parser.getParseSpec().getTimestampSpec();
if (inputRowParser.getParseSpec() != null) {
final DimensionsSpec dimensionsSpec = inputRowParser.getParseSpec().getDimensionsSpec();
final TimestampSpec timestampSpec = inputRowParser.getParseSpec().getTimestampSpec();
// exclude timestamp from dimensions by default, unless explicitly included in the list of dimensions
if (timestampSpec != null) {
@ -84,8 +128,8 @@ public class DataSchema
);
}
this.parser = parser.withParseSpec(
parser.getParseSpec()
return inputRowParser.withParseSpec(
inputRowParser.getParseSpec()
.withDimensionsSpec(
dimensionsSpec
.withDimensionExclusions(
@ -94,37 +138,12 @@ public class DataSchema
)
);
} else {
this.parser = parser;
return inputRowParser;
}
} else {
log.warn("No parser or parseSpec has been specified");
this.parser = parser;
log.warn("No parseSpec in parser has been specified.");
return inputRowParser;
}
if (aggregators.length == 0) {
log.warn("No metricsSpec has been specified. Are you sure this is what you want?");
}
this.aggregators = aggregators;
if (granularitySpec == null) {
log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default.");
this.granularitySpec = new UniformGranularitySpec(null, null, null);
} else {
this.granularitySpec = granularitySpec;
}
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public InputRowParser getParser()
{
return parser;
}
@JsonProperty("metricsSpec")
@ -141,6 +160,53 @@ public class DataSchema
public DataSchema withGranularitySpec(GranularitySpec granularitySpec)
{
return new DataSchema(dataSource, parser, aggregators, granularitySpec);
return new DataSchema(dataSource, parser, aggregators, granularitySpec, jsonMapper);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataSchema that = (DataSchema) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
if (parser != null ? !parser.equals(that.parser) : that.parser != null) {
return false;
}
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(aggregators, that.aggregators)) {
return false;
}
return granularitySpec.equals(that.granularitySpec);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + (parser != null ? parser.hashCode() : 0);
result = 31 * result + Arrays.hashCode(aggregators);
result = 31 * result + granularitySpec.hashCode();
return result;
}
@Override
public String toString()
{
return "DataSchema{" +
"dataSource='" + dataSource + '\'' +
", parser=" + parser +
", aggregators=" + Arrays.toString(aggregators) +
", granularitySpec=" + granularitySpec +
'}';
}
}

View File

@ -110,4 +110,33 @@ public class ArbitraryGranularitySpec implements GranularitySpec
{
return queryGranularity;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o;
if (!intervals.equals(that.intervals)) {
return false;
}
return !(queryGranularity != null
? !queryGranularity.equals(that.queryGranularity)
: that.queryGranularity != null);
}
@Override
public int hashCode()
{
int result = intervals.hashCode();
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
return result;
}
}

View File

@ -17,6 +17,8 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.metamx.common.IAE;
@ -25,6 +27,7 @@ import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
@ -32,24 +35,39 @@ import org.junit.Assert;
import org.joda.time.Interval;
import org.junit.Test;
import java.util.Map;
public class DataSchemaTest
{
private final ObjectMapper jsonMapper;
public DataSchemaTest()
{
jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
@Test
public void testDefaultExclusions() throws Exception
{
DataSchema schema = new DataSchema(
"test",
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("dimB", "dimA"), null, null)
)
),
), Map.class
);
DataSchema schema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
);
Assert.assertEquals(
@ -61,19 +79,24 @@ public class DataSchemaTest
@Test
public void testExplicitInclude() throws Exception
{
DataSchema schema = new DataSchema(
"test",
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "col2"), ImmutableList.of("dimC"), null)
)
),
), Map.class
);
DataSchema schema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
);
Assert.assertEquals(
@ -85,19 +108,101 @@ public class DataSchemaTest
@Test(expected = IAE.class)
public void testOverlapMetricNameAndDim() throws Exception
{
DataSchema schema = new DataSchema(
"test",
Map<String, Object> parser = jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "metric1"), ImmutableList.of("dimC"), null)
)
),
), Map.class
);
DataSchema schema = new DataSchema(
"test",
parser,
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1"),
new DoubleSumAggregatorFactory("metric2", "col2"),
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015")))
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
);
schema.getParser();
}
@Test
public void testSerdeWithInvalidParserMap() throws Exception
{
String jsonStr = "{"
+ "\"dataSource\":\"test\","
+ "\"parser\":{\"type\":\"invalid\"},"
+ "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
+ "\"granularitySpec\":{"
+ "\"type\":\"arbitrary\","
+ "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"},"
+ "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}";
//no error on serde as parser is converted to InputRowParser lazily when really needed
DataSchema schema = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, DataSchema.class)
),
DataSchema.class
);
try {
schema.getParser();
Assert.fail("should've failed to get parser.");
}
catch (IllegalArgumentException ex) {
}
}
@Test
public void testSerde() throws Exception
{
String jsonStr = "{"
+ "\"dataSource\":\"test\","
+ "\"parser\":{"
+ "\"type\":\"string\","
+ "\"parseSpec\":{"
+ "\"format\":\"json\","
+ "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null},"
+ "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[], \"spatialDimensions\":[]}}"
+ "},"
+ "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
+ "\"granularitySpec\":{"
+ "\"type\":\"arbitrary\","
+ "\"queryGranularity\":{\"type\":\"duration\",\"duration\":86400000,\"origin\":\"1970-01-01T00:00:00.000Z\"},"
+ "\"intervals\":[\"2014-01-01T00:00:00.000Z/2015-01-01T00:00:00.000Z\"]}}";
DataSchema actual = jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(jsonStr, DataSchema.class)
),
DataSchema.class
);
Assert.assertEquals(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("xXx", null, null),
new DimensionsSpec(null, null, null)
)
), Map.class
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1")
},
new ArbitraryGranularitySpec(QueryGranularity.DAY, ImmutableList.of(Interval.parse("2014/2015"))),
jsonMapper
),
actual
);
}
}

View File

@ -17,6 +17,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DimensionsSpec;
@ -36,6 +37,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
/**
*/
@ -45,28 +47,33 @@ public class FireDepartmentTest
public void testSerde() throws Exception
{
ObjectMapper jsonMapper = new DefaultObjectMapper();
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
FireDepartment schema = new FireDepartment(
new DataSchema(
"foo",
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim1", "dim2"),
null,
null
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim1", "dim2"),
null,
null
)
)
)
),
Map.class
),
new AggregatorFactory[]{
new CountAggregatorFactory("count")
},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null),
jsonMapper
),
new RealtimeIOConfig(
null,

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@ -34,6 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
@ -81,17 +83,21 @@ public class RealtimeManagerTest
makeRow(new DateTime().getMillis())
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
schema = new DataSchema(
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
);
schema2 = new DataSchema(
"testV2",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()

View File

@ -19,6 +19,7 @@
package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
@ -34,8 +35,10 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.Query;
import io.druid.query.QueryRunnerFactory;
@ -66,6 +69,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@ -112,33 +116,22 @@ public class RealtimePlumberSchoolTest
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
ObjectMapper jsonMapper = new DefaultObjectMapper();
schema = new DataSchema(
"test",
new InputRowParser()
{
@Override
public InputRow parse(Object input)
{
return null;
}
@Override
public ParseSpec getParseSpec()
{
return new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
);
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return null;
}
},
jsonMapper.convertValue(
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
)
),
Map.class
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null),
jsonMapper
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);

View File

@ -23,6 +23,7 @@ import com.metamx.common.Granularity;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
@ -48,7 +49,8 @@ public class SinkTest
"test",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null)
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null),
new DefaultObjectMapper()
);
final Interval interval = new Interval("2013-01-01/2013-01-02");