Support use of DimensionSchema class in DimensionsSpec

This commit is contained in:
jon-wei 2016-03-21 13:12:04 -07:00
parent 527b728f3e
commit a59c9ee1b1
31 changed files with 69 additions and 63 deletions

View File

@ -39,7 +39,7 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
}

View File

@ -43,7 +43,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.avroBytesDecoder = avroBytesDecoder;
}

View File

@ -79,7 +79,7 @@ public class AvroStreamInputRowParserTest
public static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
public static final TimeAndDimsParseSpec PARSE_SPEC = new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "millis", null),
new DimensionsSpec(DIMENSIONS, Collections.<String>emptyList(), null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.<String>emptyList(), null)
);
public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
private static final long SUB_LONG_VALUE = 1543698L;

View File

@ -182,7 +182,7 @@ public class KafkaIndexTaskTest
new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
ImmutableList.<String>of("dim1", "dim2"),
DimensionsSpec.getDefaultSchemas(ImmutableList.<String>of("dim1", "dim2")),
null,
null
),

View File

@ -114,7 +114,7 @@ public class DatasourcePathSpec implements PathSpec
if (updatedIngestionSpec.getDimensions() == null) {
List<String> dims;
if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensionNames();
} else {
Set<String> dimSet = Sets.newHashSet(
Iterables.concat(

View File

@ -346,7 +346,7 @@ public class BatchDeltaIngestionTest
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
)

View File

@ -112,7 +112,7 @@ public class DetermineHashedPartitionsJobTest
new DelimitedParseSpec(
new TimestampSpec("ts", null, null),
new DimensionsSpec(
ImmutableList.of("market", "quality", "placement", "placementish"),
DimensionsSpec.getDefaultSchemas(ImmutableList.of("market", "quality", "placement", "placementish")),
null,
null
),

View File

@ -227,7 +227,7 @@ public class DeterminePartitionsJobTest
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host", "country"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country")), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
)

View File

@ -67,7 +67,7 @@ public class IndexGeneratorCombinerTest
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)

View File

@ -140,7 +140,7 @@ public class IndexGeneratorJobTest
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
@ -185,7 +185,7 @@ public class IndexGeneratorJobTest
new HadoopyStringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
@ -230,7 +230,7 @@ public class IndexGeneratorJobTest
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
@ -285,7 +285,7 @@ public class IndexGeneratorJobTest
new HadoopyStringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
@ -345,7 +345,7 @@ public class IndexGeneratorJobTest
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), null, null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("B", "F", "M", "Q", "X", "Y")), null, null)
)
),
1, // force 1 row max per index for easier testing

View File

@ -72,7 +72,7 @@ public class JobHelperTest
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited_num")
)

View File

@ -160,7 +160,7 @@ public class HadoopConverterJobTest
new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(TestIndex.DIMENSIONS), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(TestIndex.DIMENSIONS)), null, null),
"\t",
"\u0001",
Arrays.asList(TestIndex.COLUMNS)

View File

@ -151,7 +151,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
if (dimensions != null) {
dims = dimensions;
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensions();
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
} else {
Set<String> dimSet = Sets.newHashSet(
Iterables.concat(

View File

@ -111,7 +111,7 @@ public class IndexTaskTest
null
),
new DimensionsSpec(
Arrays.asList("ts"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
@ -183,7 +183,7 @@ public class IndexTaskTest
null
),
new DimensionsSpec(
Arrays.asList("ts"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
@ -289,7 +289,7 @@ public class IndexTaskTest
null
),
new DimensionsSpec(
Arrays.asList("dim"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),

View File

@ -288,7 +288,7 @@ public class IngestSegmentFirehoseFactoryTest
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
ImmutableList.<String>of(),
DimensionsSpec.getDefaultSchemas(ImmutableList.<String>of()),
ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME),
ImmutableList.<SpatialDimensionSchema>of()
)
@ -406,7 +406,7 @@ public class IngestSegmentFirehoseFactoryTest
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
ImmutableList.of(DIM_NAME),
DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)),
ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME),
ImmutableList.<SpatialDimensionSchema>of()
)

View File

@ -97,7 +97,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
new JSONParseSpec(
new TimestampSpec(TIME_COLUMN, "auto", null),
new DimensionsSpec(
Arrays.asList(DIMENSIONS),
DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)),
null,
null
)

View File

@ -62,7 +62,7 @@
<apache.curator.version>2.9.1</apache.curator.version>
<jetty.version>9.2.5.v20141112</jetty.version>
<jersey.version>1.19</jersey.version>
<druid.api.version>0.3.16</druid.api.version>
<druid.api.version>0.3.17</druid.api.version>
<!-- Watch out for Hadoop compatibility when updating to >= 2.5; see https://github.com/druid-io/druid/pull/1669 -->
<jackson.version>2.4.6</jackson.version>
<log4j.version>2.5</log4j.version>

View File

@ -130,7 +130,8 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
@Override
public TopNResultBuilder addEntry(DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor)
{
String dimensionValue = dimensionAndMetricValueExtractor.getStringDimensionValue(dimSpec.getOutputName());
Object dimensionValueObj = dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName());
String dimensionValue = dimensionValueObj.toString();
if (shouldAdd(dimensionValue)) {
pQueue.add(

View File

@ -352,7 +352,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
retVal.add(input.getTimestamp().getMillis());
for (DimensionAndMetricValueExtractor result : results) {
List<Object> vals = Lists.newArrayListWithCapacity(aggFactoryNames.length + 2);
vals.add(result.getStringDimensionValue(query.getDimensionSpec().getOutputName()));
vals.add(result.getDimensionValue(query.getDimensionSpec().getOutputName()));
for (String aggName : aggFactoryNames) {
vals.add(result.getMetric(aggName));
}
@ -488,7 +488,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
)
{
String dimOutputName = topNQuery.getDimensionSpec().getOutputName();
String dimValue = input.getStringDimensionValue(dimOutputName);
Object dimValue = input.getDimensionValue(dimOutputName);
Map<String, Object> map = input.getBaseObject();
map.put(
dimOutputName,

View File

@ -36,7 +36,9 @@ import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.NewSpatialDimensionSchema;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
@ -82,13 +84,16 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
{
private volatile DateTime maxIngestedEventTime;
private static final Map<String, ValueType> TYPE_MAP = ImmutableMap.<String, ValueType>builder()
.put("Long[]", ValueType.LONG)
.put("Double[]", ValueType.FLOAT)
.put("String[]", ValueType.STRING)
.put("Long", ValueType.LONG)
.put("Double", ValueType.FLOAT)
.put("String", ValueType.STRING)
// Used to discover ValueType based on the class of values in a row
// Also used to convert between the duplicate ValueType enums in DimensionSchema (druid-api) and main druid.
private static final Map<Object, ValueType> TYPE_MAP = ImmutableMap.<Object, ValueType>builder()
.put(Long.class, ValueType.LONG)
.put(Double.class, ValueType.FLOAT)
.put(Float.class, ValueType.FLOAT)
.put(String.class, ValueType.STRING)
.put(DimensionSchema.ValueType.LONG, ValueType.LONG)
.put(DimensionSchema.ValueType.FLOAT, ValueType.FLOAT)
.put(DimensionSchema.ValueType.STRING, ValueType.STRING)
.build();
private static final Function<Object, String> STRING_TRANSFORMER = new Function<Object, String>()
@ -404,11 +409,17 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
this.dimensionDescs = Maps.newLinkedHashMap();
this.dimValues = Collections.synchronizedList(Lists.<DimDim>newArrayList());
for (String dimension : dimensionsSpec.getDimensions()) {
for (DimensionSchema dimSchema : dimensionsSpec.getDimensions()) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
addNewDimension(dimension, capabilities);
columnCapabilities.put(dimension, capabilities);
ValueType type = TYPE_MAP.get(dimSchema.getValueType());
capabilities.setType(type);
if (dimSchema.getTypeName().equals(DimensionSchema.SPATIAL_TYPE_NAME)) {
capabilities.setHasSpatialIndexes(true);
} else {
addNewDimension(dimSchema.getName(), capabilities);
}
columnCapabilities.put(dimSchema.getName(), capabilities);
}
// This should really be more generic
@ -416,12 +427,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
}
private DimDim newDimDim(String dimension, ValueType type) {
@ -511,7 +516,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return null;
}
return TYPE_MAP.get(singleVal.getClass().getSimpleName());
return TYPE_MAP.get(singleVal.getClass());
}
private List<Comparable> getRowDimensionAsComparables(InputRow row, String dimension, ValueType type)
@ -596,7 +601,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
// TODO: For schemaless type discovery, assume everything is a String for now, can change later.
// For schemaless type discovery, assume everything is a String for now, can change later.
//valType = getTypeFromDimVal(row.getRaw(dimension));
if (valType == null) {
valType = ValueType.STRING;

View File

@ -59,7 +59,7 @@ public class ProtoBufInputRowParserTest
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(Arrays.asList(DIMENSIONS), Arrays.<String>asList(), null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), Arrays.<String>asList(), null)
),
"prototest.desc"
);

View File

@ -106,7 +106,7 @@ public class MultiValuedDimensionTest
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(ImmutableList.of("product", "tags"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags")), null, null),
"\t",
ImmutableList.of("timestamp", "product", "tags")
),

View File

@ -273,7 +273,7 @@ public class IndexIOTest
)
.withDimensionsSpec(
new DimensionsSpec(
Arrays.asList("dim0", "dim1"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)
@ -295,7 +295,7 @@ public class IndexIOTest
)
.withDimensionsSpec(
new DimensionsSpec(
Arrays.asList("dim0", "dim1"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)

View File

@ -875,7 +875,7 @@ public class IndexMergerTest
public void testMergeWithDimensionsList() throws Exception
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(Arrays.asList("dimA", "dimB", "dimC"), null, null))
.withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("dimA", "dimB", "dimC")), null, null))
.withMinTimestamp(0L)
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
@ -1622,7 +1622,7 @@ public class IndexMergerTest
IncrementalIndexSchema schema = new IncrementalIndexSchema(
0L,
QueryGranularity.NONE,
new DimensionsSpec(dims, null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null),
new AggregatorFactory[]{new CountAggregatorFactory("count")}
);

View File

@ -200,7 +200,7 @@ public class TestIndex
final StringInputRowParser parser = new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(Arrays.asList(DIMENSIONS), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), null, null),
"\t",
"\u0001",
Arrays.asList(COLUMNS)

View File

@ -621,7 +621,7 @@ public class IncrementalIndexTest
)
.withDimensionsSpec(
new DimensionsSpec(
Arrays.asList("dim0", "dim1"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
null,
null
)

View File

@ -122,7 +122,7 @@ public class DataSchema
for (AggregatorFactory aggregator : aggregators) {
metSet.add(aggregator.getName());
}
final Set<String> dimSet = Sets.newHashSet(dimensionsSpec.getDimensions());
final Set<String> dimSet = Sets.newHashSet(dimensionsSpec.getDimensionNames());
final Set<String> overlap = Sets.intersection(metSet, dimSet);
if (!overlap.isEmpty()) {
throw new IAE(

View File

@ -57,7 +57,7 @@ public class DataSchemaTest
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("dimB", "dimA"), null, null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null)
)
), new TypeReference<Map<String, Object>>() {}
);
@ -86,7 +86,7 @@ public class DataSchemaTest
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "col2"), ImmutableList.of("dimC"), null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "col2")), ImmutableList.of("dimC"), null)
)
), new TypeReference<Map<String, Object>>() {}
);
@ -115,7 +115,7 @@ public class DataSchemaTest
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(ImmutableList.of("time", "dimA", "dimB", "metric1"), ImmutableList.of("dimC"), null)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("time", "dimA", "dimB", "metric1")), ImmutableList.of("dimC"), null)
)
), new TypeReference<Map<String, Object>>() {}
);
@ -173,7 +173,7 @@ public class DataSchemaTest
+ "\"parseSpec\":{"
+ "\"format\":\"json\","
+ "\"timestampSpec\":{\"column\":\"xXx\", \"format\": \"auto\", \"missingValue\": null},"
+ "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[], \"spatialDimensions\":[]},"
+ "\"dimensionsSpec\":{\"dimensions\":[], \"dimensionExclusions\":[]},"
+ "\"flattenSpec\":{\"useFieldDiscovery\":true, \"fields\":[]},"
+ "\"featureSpec\":{}},"
+ "\"encoding\":\"UTF-8\""

View File

@ -83,7 +83,7 @@ public class FireDepartmentTest
null
),
new DimensionsSpec(
Arrays.asList("dim1", "dim2"),
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim1", "dim2")),
null,
null
)

View File

@ -82,7 +82,7 @@ public class EventReceiverFirehoseTest
"timestamp",
"auto",
null
), new DimensionsSpec(ImmutableList.of("d1"), null, null)
), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null)
)
)
);
@ -212,7 +212,7 @@ public class EventReceiverFirehoseTest
"timestamp",
"auto",
null
), new DimensionsSpec(ImmutableList.of("d1"), null, null)
), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null)
)
)
);

View File

@ -107,7 +107,7 @@ public class IngestSegmentFirehoseTest
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
),