bug fixes for segment metadata queries and spatial indexing

This commit is contained in:
fjy 2013-05-20 20:19:18 -07:00
parent cccd5758de
commit efc92e7ca6
11 changed files with 69 additions and 25 deletions

View File

@ -31,32 +31,36 @@ public class ColumnAnalysis
public static ColumnAnalysis error(String reason)
{
return new ColumnAnalysis(ERROR_PREFIX + reason, -1, null);
return new ColumnAnalysis(ValueType.STRING, -1, null, ERROR_PREFIX + reason);
}
private final String type;
private final long size;
private final Integer cardinality;
private final String errorMessage;
@JsonCreator
public ColumnAnalysis(
@JsonProperty("type") ValueType type,
@JsonProperty("size") long size,
@JsonProperty("cardinality") Integer cardinality
@JsonProperty("cardinality") Integer cardinality,
@JsonProperty("errorMessage") String errorMessage
)
{
this(type.name(), size, cardinality);
this(type.name(), size, cardinality, errorMessage);
}
private ColumnAnalysis(
String type,
long size,
Integer cardinality
Integer cardinality,
String errorMessage
)
{
this.type = type;
this.size = size;
this.cardinality = cardinality;
this.errorMessage = errorMessage;
}
@JsonProperty
@ -77,9 +81,15 @@ public class ColumnAnalysis
return cardinality;
}
@JsonProperty
public String getErrorMessage()
{
return errorMessage;
}
public boolean isError()
{
return type.startsWith(ERROR_PREFIX);
return (errorMessage != null && !errorMessage.isEmpty());
}
public ColumnAnalysis fold(ColumnAnalysis rhs)
@ -103,7 +113,7 @@ public class ColumnAnalysis
}
}
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality);
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality, null);
}
@Override
@ -113,6 +123,7 @@ public class ColumnAnalysis
"type='" + type + '\'' +
", size=" + size +
", cardinality=" + cardinality +
", errorMessage='" + errorMessage + '\'' +
'}';
}
}

View File

@ -64,7 +64,7 @@ public class SpatialDimensionRowFormatter
public InputRow formatRow(final InputRow row)
{
final Map<String, List<String>> finalDimLookup = Maps.newHashMap();
final Map<String, List<String>> spatialLookup = Maps.newHashMap();
// remove all spatial dimensions
final List<String> finalDims = Lists.newArrayList(
@ -91,9 +91,6 @@ public class SpatialDimensionRowFormatter
}
)
);
for (String dim : finalDims) {
finalDimLookup.put(dim, row.getDimension(dim));
}
InputRow retVal = new InputRow()
{
@ -112,7 +109,8 @@ public class SpatialDimensionRowFormatter
@Override
public List<String> getDimension(String dimension)
{
return finalDimLookup.get(dimension);
List<String> retVal = spatialLookup.get(dimension);
return (retVal == null) ? row.getDimension(dimension) : retVal;
}
@Override
@ -131,7 +129,7 @@ public class SpatialDimensionRowFormatter
}
spatialDimVals.addAll(dimVals);
}
finalDimLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}

View File

@ -251,6 +251,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask
firehoseFactory,
new Schema(
schema.getDataSource(),
schema.getSpatialDimensions(),
schema.getAggregators(),
schema.getIndexGranularity(),
shardSpec

View File

@ -27,13 +27,14 @@ import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.indexer.granularity.GranularitySpec;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -45,6 +46,9 @@ public class IndexTask extends AbstractTask
@JsonIgnore
private final GranularitySpec granularitySpec;
@JsonProperty
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonIgnore
private final AggregatorFactory[] aggregators;
@ -67,6 +71,7 @@ public class IndexTask extends AbstractTask
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") long targetPartitionSize,
@ -85,6 +90,9 @@ public class IndexTask extends AbstractTask
);
this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec");
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = indexGranularity;
this.targetPartitionSize = targetPartitionSize;
@ -107,6 +115,7 @@ public class IndexTask extends AbstractTask
firehoseFactory,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
new NoneShardSpec()
@ -125,6 +134,7 @@ public class IndexTask extends AbstractTask
firehoseFactory,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
new NoneShardSpec()

View File

@ -33,6 +33,7 @@ public class TaskSerdeTest
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
@ -65,6 +66,7 @@ public class TaskSerdeTest
null,
new Schema(
"foo",
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
new NoneShardSpec()
@ -201,7 +203,7 @@ public class TaskSerdeTest
{
final Task task = new RealtimeIndexTask(
null,
new Schema("foo", new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
new Period("PT10M"),

View File

@ -193,6 +193,7 @@ public class TaskLifecycleTest
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
@ -239,6 +240,7 @@ public class TaskLifecycleTest
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,

View File

@ -22,18 +22,22 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec;
import java.util.Arrays;
import java.util.List;
/**
*/
public class Schema
{
private final String dataSource;
private final List<SpatialDimensionSchema> spatialDimensions;
private final AggregatorFactory[] aggregators;
private final QueryGranularity indexGranularity;
private final ShardSpec shardSpec;
@ -41,12 +45,15 @@ public class Schema
@JsonCreator
public Schema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
{
this.dataSource = dataSource;
this.spatialDimensions = (spatialDimensions == null) ? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = indexGranularity;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
@ -62,6 +69,12 @@ public class Schema
return dataSource;
}
@JsonProperty("spatialDimensions")
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@JsonProperty
public AggregatorFactory[] getAggregators()
{
@ -85,8 +98,10 @@ public class Schema
{
return "Schema{" +
"dataSource='" + dataSource + '\'' +
", spatialDimensions=" + spatialDimensions +
", aggregators=" + (aggregators == null ? null : Arrays.asList(aggregators)) +
", indexGranularity=" + indexGranularity +
", shardSpec=" + shardSpec +
'}';
}
}

View File

@ -30,6 +30,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
@ -42,7 +43,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*/
*/
public class Sink implements Iterable<FireHydrant>
{
private static final Logger log = new Logger(Sink.class);
@ -145,7 +146,8 @@ public class Sink implements Iterable<FireHydrant>
{
return input.getName();
}
}),
}
),
schema.getShardSpec(),
null,
0
@ -155,7 +157,12 @@ public class Sink implements Iterable<FireHydrant>
private FireHydrant makeNewCurrIndex(long minTimestamp, Schema schema)
{
IncrementalIndex newIndex = new IncrementalIndex(
minTimestamp, schema.getIndexGranularity(), schema.getAggregators()
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(schema.getIndexGranularity())
.withSpatialDimensions(schema.getSpatialDimensions())
.withMetrics(schema.getAggregators())
.build()
);
FireHydrant old;

View File

@ -314,6 +314,9 @@ public class ServerManager implements QuerySegmentWalker
);
}
}
)
.filter(
Predicates.<QueryRunner<T>>notNull()
);
return new FinalizeResultsQueryRunner<T>(toolChest.mergeResults(factory.mergeRunners(exec, adapters)), toolChest);

View File

@ -137,11 +137,6 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
{
Interval actualInterval = interval;
final Interval indexInterval = getInterval();
if (!actualInterval.overlaps(indexInterval)) {
return ImmutableList.of();
}
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));

View File

@ -105,7 +105,7 @@ public class SegmentAnalyzer
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null);
return new ColumnAnalysis(capabilities.getType(), column.getLength() * numBytes, null, null);
}
public ColumnAnalysis analyzeStringColumn(Column column)
@ -125,7 +125,7 @@ public class SegmentAnalyzer
}
}
return new ColumnAnalysis(capabilities.getType(), size, cardinality);
return new ColumnAnalysis(capabilities.getType(), size, cardinality, null);
}
return ColumnAnalysis.error("string_no_bitmap");
@ -153,6 +153,6 @@ public class SegmentAnalyzer
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
return new ColumnAnalysis(capabilities.getType(), size, null);
return new ColumnAnalysis(capabilities.getType(), size, null, null);
}
}