From efc92e7ca633fed0b9e8663cded3eecee35502bb Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 20 May 2013 20:19:18 -0700 Subject: [PATCH] bug fixes for segment metadata queries and spatial indexing --- .../druid/query/metadata/ColumnAnalysis.java | 23 ++++++++++++++----- .../v1/SpatialDimensionRowFormatter.java | 10 ++++---- .../task/IndexDeterminePartitionsTask.java | 1 + .../druid/merger/common/task/IndexTask.java | 12 +++++++++- .../merger/common/task/TaskSerdeTest.java | 4 +++- .../merger/coordinator/TaskLifecycleTest.java | 2 ++ .../com/metamx/druid/realtime/Schema.java | 15 ++++++++++++ .../metamx/druid/realtime/plumber/Sink.java | 13 ++++++++--- .../druid/coordination/ServerManager.java | 3 +++ .../v1/QueryableIndexStorageAdapter.java | 5 ---- .../druid/query/metadata/SegmentAnalyzer.java | 6 ++--- 11 files changed, 69 insertions(+), 25 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java b/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java index 01a54a1fbc2..3c98dd514d6 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/ColumnAnalysis.java @@ -31,32 +31,36 @@ public class ColumnAnalysis public static ColumnAnalysis error(String reason) { - return new ColumnAnalysis(ERROR_PREFIX + reason, -1, null); + return new ColumnAnalysis(ValueType.STRING, -1, null, ERROR_PREFIX + reason); } private final String type; private final long size; private final Integer cardinality; + private final String errorMessage; @JsonCreator public ColumnAnalysis( @JsonProperty("type") ValueType type, @JsonProperty("size") long size, - @JsonProperty("cardinality") Integer cardinality + @JsonProperty("cardinality") Integer cardinality, + @JsonProperty("errorMessage") String errorMessage ) { - this(type.name(), size, cardinality); + this(type.name(), size, cardinality, errorMessage); } private ColumnAnalysis( String type, long size, - Integer cardinality + Integer cardinality, + String errorMessage ) { this.type = type; this.size = size; this.cardinality = cardinality; + this.errorMessage = errorMessage; } @JsonProperty @@ -77,9 +81,15 @@ public class ColumnAnalysis return cardinality; } + @JsonProperty + public String getErrorMessage() + { + return errorMessage; + } + public boolean isError() { - return type.startsWith(ERROR_PREFIX); + return (errorMessage != null && !errorMessage.isEmpty()); } public ColumnAnalysis fold(ColumnAnalysis rhs) @@ -103,7 +113,7 @@ public class ColumnAnalysis } } - return new ColumnAnalysis(type, size + rhs.getSize(), cardinality); + return new ColumnAnalysis(type, size + rhs.getSize(), cardinality, null); } @Override @@ -113,6 +123,7 @@ public class ColumnAnalysis "type='" + type + '\'' + ", size=" + size + ", cardinality=" + cardinality + + ", errorMessage='" + errorMessage + '\'' + '}'; } } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java b/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java index 033df4e72df..bb2baebf302 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java @@ -64,7 +64,7 @@ public class SpatialDimensionRowFormatter public InputRow formatRow(final InputRow row) { - final Map> finalDimLookup = Maps.newHashMap(); + final Map> spatialLookup = Maps.newHashMap(); // remove all spatial dimensions final List finalDims = Lists.newArrayList( @@ -91,9 +91,6 @@ public class SpatialDimensionRowFormatter } ) ); - for (String dim : finalDims) { - finalDimLookup.put(dim, row.getDimension(dim)); - } InputRow retVal = new InputRow() { @@ -112,7 +109,8 @@ public class SpatialDimensionRowFormatter @Override public List getDimension(String dimension) { - return finalDimLookup.get(dimension); + List retVal = spatialLookup.get(dimension); + return (retVal == null) ? row.getDimension(dimension) : retVal; } @Override @@ -131,7 +129,7 @@ public class SpatialDimensionRowFormatter } spatialDimVals.addAll(dimVals); } - finalDimLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals))); + spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals))); finalDims.add(spatialDimension.getDimName()); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 2d97a86c078..cbe6127de07 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -251,6 +251,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask firehoseFactory, new Schema( schema.getDataSource(), + schema.getSpatialDimensions(), schema.getAggregators(), schema.getIndexGranularity(), shardSpec diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index dbfa13ec204..49eb9e41d97 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -27,13 +27,14 @@ import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import com.metamx.druid.indexer.granularity.GranularitySpec; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.TaskActionClient; -import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.realtime.Schema; +import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.shard.NoneShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -45,6 +46,9 @@ public class IndexTask extends AbstractTask @JsonIgnore private final GranularitySpec granularitySpec; + @JsonProperty + private final List spatialDimensions; + @JsonIgnore private final AggregatorFactory[] aggregators; @@ -67,6 +71,7 @@ public class IndexTask extends AbstractTask @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + @JsonProperty("spatialDimensions") List spatialDimensions, @JsonProperty("aggregators") AggregatorFactory[] aggregators, @JsonProperty("indexGranularity") QueryGranularity indexGranularity, @JsonProperty("targetPartitionSize") long targetPartitionSize, @@ -85,6 +90,9 @@ public class IndexTask extends AbstractTask ); this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec"); + this.spatialDimensions = (spatialDimensions == null) + ? Lists.newArrayList() + : spatialDimensions; this.aggregators = aggregators; this.indexGranularity = indexGranularity; this.targetPartitionSize = targetPartitionSize; @@ -107,6 +115,7 @@ public class IndexTask extends AbstractTask firehoseFactory, new Schema( getDataSource(), + spatialDimensions, aggregators, indexGranularity, new NoneShardSpec() @@ -125,6 +134,7 @@ public class IndexTask extends AbstractTask firehoseFactory, new Schema( getDataSource(), + spatialDimensions, aggregators, indexGranularity, new NoneShardSpec() diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 91b6f1ba0f7..adbd188be14 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -33,6 +33,7 @@ public class TaskSerdeTest null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), + null, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, QueryGranularity.NONE, 10000, @@ -65,6 +66,7 @@ public class TaskSerdeTest null, new Schema( "foo", + null, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, QueryGranularity.NONE, new NoneShardSpec() @@ -201,7 +203,7 @@ public class TaskSerdeTest { final Task task = new RealtimeIndexTask( null, - new Schema("foo", new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()), + new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()), null, null, new Period("PT10M"), diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index 42a556d927c..92ffc0596c7 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -193,6 +193,7 @@ public class TaskLifecycleTest null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), + null, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, QueryGranularity.NONE, 10000, @@ -239,6 +240,7 @@ public class TaskLifecycleTest null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), + null, new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, QueryGranularity.NONE, 10000, diff --git a/realtime/src/main/java/com/metamx/druid/realtime/Schema.java b/realtime/src/main/java/com/metamx/druid/realtime/Schema.java index 944908a0868..37b9fa2ae79 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/Schema.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/Schema.java @@ -22,18 +22,22 @@ package com.metamx.druid.realtime; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.ShardSpec; import java.util.Arrays; +import java.util.List; /** */ public class Schema { private final String dataSource; + private final List spatialDimensions; private final AggregatorFactory[] aggregators; private final QueryGranularity indexGranularity; private final ShardSpec shardSpec; @@ -41,12 +45,15 @@ public class Schema @JsonCreator public Schema( @JsonProperty("dataSource") String dataSource, + @JsonProperty("spatialDimensions") List spatialDimensions, @JsonProperty("aggregators") AggregatorFactory[] aggregators, @JsonProperty("indexGranularity") QueryGranularity indexGranularity, @JsonProperty("shardSpec") ShardSpec shardSpec ) { this.dataSource = dataSource; + this.spatialDimensions = (spatialDimensions == null) ? Lists.newArrayList() + : spatialDimensions; this.aggregators = aggregators; this.indexGranularity = indexGranularity; this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec; @@ -62,6 +69,12 @@ public class Schema return dataSource; } + @JsonProperty("spatialDimensions") + public List getSpatialDimensions() + { + return spatialDimensions; + } + @JsonProperty public AggregatorFactory[] getAggregators() { @@ -85,8 +98,10 @@ public class Schema { return "Schema{" + "dataSource='" + dataSource + '\'' + + ", spatialDimensions=" + spatialDimensions + ", aggregators=" + (aggregators == null ? null : Arrays.asList(aggregators)) + ", indexGranularity=" + indexGranularity + + ", shardSpec=" + shardSpec + '}'; } } diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java index 039f981e2fe..a5dd4ae38a1 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java @@ -30,6 +30,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IncrementalIndex; +import com.metamx.druid.index.v1.IncrementalIndexSchema; import com.metamx.druid.input.InputRow; import com.metamx.druid.realtime.FireHydrant; import com.metamx.druid.realtime.Schema; @@ -42,7 +43,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; /** -*/ + */ public class Sink implements Iterable { private static final Logger log = new Logger(Sink.class); @@ -145,7 +146,8 @@ public class Sink implements Iterable { return input.getName(); } - }), + } + ), schema.getShardSpec(), null, 0 @@ -155,7 +157,12 @@ public class Sink implements Iterable private FireHydrant makeNewCurrIndex(long minTimestamp, Schema schema) { IncrementalIndex newIndex = new IncrementalIndex( - minTimestamp, schema.getIndexGranularity(), schema.getAggregators() + new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(schema.getIndexGranularity()) + .withSpatialDimensions(schema.getSpatialDimensions()) + .withMetrics(schema.getAggregators()) + .build() ); FireHydrant old; diff --git a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java index ac8f3a65db5..7803475d091 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ServerManager.java +++ b/server/src/main/java/com/metamx/druid/coordination/ServerManager.java @@ -314,6 +314,9 @@ public class ServerManager implements QuerySegmentWalker ); } } + ) + .filter( + Predicates.>notNull() ); return new FinalizeResultsQueryRunner(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest); diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java index 8e51d3254e4..fb9a81f864b 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java @@ -137,11 +137,6 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter public Iterable makeCursors(Filter filter, Interval interval, QueryGranularity gran) { Interval actualInterval = interval; - final Interval indexInterval = getInterval(); - - if (!actualInterval.overlaps(indexInterval)) { - return ImmutableList.of(); - } final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis())); diff --git a/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java b/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java index 24dc1f2b43b..8d5a6136998 100644 --- a/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java +++ b/server/src/main/java/com/metamx/druid/query/metadata/SegmentAnalyzer.java @@ -105,7 +105,7 @@ public class SegmentAnalyzer return ColumnAnalysis.error("multi_value"); } - return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null); + return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null, null); } public ColumnAnalysis analyzeStringColumn(Column column) @@ -125,7 +125,7 @@ public class SegmentAnalyzer } } - return new ColumnAnalysis(capabilities.getType(), size, cardinality); + return new ColumnAnalysis(capabilities.getType(), size, cardinality, null); } return ColumnAnalysis.error("string_no_bitmap"); @@ -153,6 +153,6 @@ public class SegmentAnalyzer size += inputSizeFn.apply(complexColumn.getRowValue(i)); } - return new ColumnAnalysis(capabilities.getType(), size, null); + return new ColumnAnalysis(capabilities.getType(), size, null, null); } }