Merge pull request #2305 from gianm/segment-metadata-query-multivalues

Add StorageAdapter#getColumnTypeName, and various SegmentMetadataQuery adjustments
This commit is contained in:
Fangjin Yang 2016-01-21 17:22:34 -08:00
commit 5a9cd89059
19 changed files with 423 additions and 304 deletions

View File

@ -39,20 +39,23 @@ The format of the result is:
"id" : "some_id",
"intervals" : [ "2013-05-13T00:00:00.000Z/2013-05-14T00:00:00.000Z" ],
"columns" : {
"__time" : { "type" : "LONG", "size" : 407240380, "cardinality" : null },
"dim1" : { "type" : "STRING", "size" : 100000, "cardinality" : 1944 },
"dim2" : { "type" : "STRING", "size" : 100000, "cardinality" : 1504 },
"metric1" : { "type" : "FLOAT", "size" : 100000, "cardinality" : null }
"__time" : { "type" : "LONG", "hasMultipleValues" : false, "size" : 407240380, "cardinality" : null, "errorMessage" : null },
"dim1" : { "type" : "STRING", "hasMultipleValues" : false, "size" : 100000, "cardinality" : 1944, "errorMessage" : null },
"dim2" : { "type" : "STRING", "hasMultipleValues" : true, "size" : 100000, "cardinality" : 1504, "errorMessage" : null },
"metric1" : { "type" : "FLOAT", "hasMultipleValues" : false, "size" : 100000, "cardinality" : null, "errorMessage" : null }
},
"size" : 300000,
"numRows" : 5000000
} ]
```
Dimension columns will have type `STRING`.
Dimension columns will have type `STRING`.
Metric columns will have type `FLOAT` or `LONG` or name of the underlying complex type such as `hyperUnique` in case of COMPLEX metric.
Timestamp column will have type `LONG`.
If the `errorMessage` field is non-null, you should not trust the other fields in the response. Their contents are
undefined.
Only columns which are dimensions (ie, have type `STRING`) will have any cardinality. Rest of the columns (timestamp and metric columns) will show cardinality as `null`.
### intervals

View File

@ -22,28 +22,29 @@ package io.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import java.util.Collections;
import javax.annotation.Nullable;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SegmentAnalyzer
{
@ -59,33 +60,54 @@ public class SegmentAnalyzer
*/
private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
public Map<String, ColumnAnalysis> analyze(
QueryableIndex index,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
private final EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
public SegmentAnalyzer(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
Preconditions.checkNotNull(index, "Index cannot be null");
this.analysisTypes = analysisTypes;
}
public int numRows(Segment segment)
{
return Preconditions.checkNotNull(segment, "segment").asStorageAdapter().getNumRows();
}
public Map<String, ColumnAnalysis> analyze(Segment segment)
{
Preconditions.checkNotNull(segment, "segment");
// index is null for incremental-index-based segments, but storageAdapter is always available
final QueryableIndex index = segment.asQueryableIndex();
final StorageAdapter storageAdapter = segment.asStorageAdapter();
// get length and column names from storageAdapter
final int length = storageAdapter.getNumRows();
final Set<String> columnNames = Sets.newHashSet();
Iterables.addAll(columnNames, storageAdapter.getAvailableDimensions());
Iterables.addAll(columnNames, storageAdapter.getAvailableMetrics());
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
for (String columnName : index.getColumnNames()) {
final Column column = index.getColumn(columnName);
final ColumnCapabilities capabilities = column.getCapabilities();
for (String columnName : columnNames) {
final Column column = index == null ? null : index.getColumn(columnName);
final ColumnCapabilities capabilities = column != null
? column.getCapabilities()
: storageAdapter.getColumnCapabilities(columnName);
final ColumnAnalysis analysis;
final ValueType type = capabilities.getType();
switch (type) {
case LONG:
analysis = analyzeLongColumn(column, analysisTypes);
analysis = analyzeNumericColumn(capabilities, length, Longs.BYTES);
break;
case FLOAT:
analysis = analyzeFloatColumn(column, analysisTypes);
analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT);
break;
case STRING:
analysis = analyzeStringColumn(column, analysisTypes);
analysis = analyzeStringColumn(capabilities, column, storageAdapter.getDimensionCardinality(columnName));
break;
case COMPLEX:
analysis = analyzeComplexColumn(column, analysisTypes);
analysis = analyzeComplexColumn(capabilities, column, storageAdapter.getColumnTypeName(columnName));
break;
default:
log.warn("Unknown column type[%s].", type);
@ -95,201 +117,122 @@ public class SegmentAnalyzer
columns.put(columnName, analysis);
}
// Add time column too
ColumnCapabilities timeCapabilities = storageAdapter.getColumnCapabilities(Column.TIME_COLUMN_NAME);
if (timeCapabilities == null) {
timeCapabilities = new ColumnCapabilitiesImpl().setType(ValueType.LONG).setHasMultipleValues(false);
}
columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP, analysisTypes)
analyzeNumericColumn(timeCapabilities, length, NUM_BYTES_IN_TIMESTAMP)
);
return columns;
}
public Map<String, ColumnAnalysis> analyze(
StorageAdapter adapter,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
public boolean analyzingSize()
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
}
public boolean analyzingCardinality()
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
}
private ColumnAnalysis analyzeNumericColumn(
final ColumnCapabilities capabilities,
final int length,
final int sizePerRow
)
{
Preconditions.checkNotNull(adapter, "Adapter cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
List<String> columnNames = getStorageAdapterColumnNames(adapter);
int numRows = adapter.getNumRows();
for (String columnName : columnNames) {
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnName);
final ColumnAnalysis analysis;
/**
* StorageAdapter doesn't provide a way to get column values, so size is
* not calculated for STRING and COMPLEX columns.
*/
ValueType capType = capabilities.getType();
switch (capType) {
case LONG:
analysis = lengthBasedAnalysisForAdapter(
analysisTypes,
capType.name(), capabilities,
numRows, Longs.BYTES
);
break;
case FLOAT:
analysis = lengthBasedAnalysisForAdapter(
analysisTypes,
capType.name(), capabilities,
numRows, NUM_BYTES_IN_TEXT_FLOAT
);
break;
case STRING:
analysis = new ColumnAnalysis(
capType.name(),
0,
analysisHasCardinality(analysisTypes) ? adapter.getDimensionCardinality(columnName) : 0,
null
);
break;
case COMPLEX:
analysis = new ColumnAnalysis(
capType.name(),
0,
null,
null
);
break;
default:
log.warn("Unknown column type[%s].", capType);
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", capType));
}
columns.put(columnName, analysis);
}
columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysisForAdapter(analysisTypes, ValueType.LONG.name(), null, numRows, NUM_BYTES_IN_TIMESTAMP)
);
return columns;
}
public ColumnAnalysis analyzeLongColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
return lengthBasedAnalysis(column, Longs.BYTES, analysisTypes);
}
public ColumnAnalysis analyzeFloatColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT, analysisTypes);
}
private ColumnAnalysis lengthBasedAnalysis(
Column column,
final int numBytes,
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
int size = 0;
if (analysisHasSize(analysisTypes)) {
size = column.getLength() * numBytes;
}
return new ColumnAnalysis(capabilities.getType().name(), size, null, null);
}
public ColumnAnalysis analyzeStringColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
final ColumnCapabilities capabilities = column.getCapabilities();
if (capabilities.hasBitmapIndexes()) {
final BitmapIndex bitmapIndex = column.getBitmapIndex();
int cardinality = bitmapIndex.getCardinality();
long size = 0;
if (analysisHasSize(analysisTypes)) {
for (int i = 0; i < cardinality; ++i) {
String value = bitmapIndex.getValue(i);
if (value != null) {
size += StringUtils.toUtf8(value).length * bitmapIndex.getBitmap(value).size();
}
}
}
return new ColumnAnalysis(
capabilities.getType().name(),
size,
analysisHasCardinality(analysisTypes) ? cardinality : 0,
null
);
}
return ColumnAnalysis.error("string_no_bitmap");
}
public ColumnAnalysis analyzeComplexColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
{
final ColumnCapabilities capabilities = column.getCapabilities();
final ComplexColumn complexColumn = column.getComplexColumn();
final String typeName = complexColumn.getTypeName();
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
}
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
if (inputSizeFn == null) {
return new ColumnAnalysis(typeName, 0, null, null);
}
final int length = column.getLength();
long size = 0;
if (analysisHasSize(analysisTypes)) {
for (int i = 0; i < length; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
if (analyzingSize()) {
if (capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
size = ((long) length) * sizePerRow;
}
return new ColumnAnalysis(typeName, size, null, null);
}
private List<String> getStorageAdapterColumnNames(StorageAdapter adapter)
{
Indexed<String> dims = adapter.getAvailableDimensions();
Iterable<String> metrics = adapter.getAvailableMetrics();
Iterable<String> columnNames = Iterables.concat(dims, metrics);
List<String> sortedColumnNames = Lists.newArrayList(columnNames);
Collections.sort(sortedColumnNames);
return sortedColumnNames;
}
private ColumnAnalysis lengthBasedAnalysisForAdapter(
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes,
String type, ColumnCapabilities capabilities,
int numRows, final int numBytes
)
{
if (capabilities != null && capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(
type,
analysisHasSize(analysisTypes) ? numRows * numBytes : 0,
capabilities.getType().name(),
capabilities.hasMultipleValues(),
size,
null,
null
);
}
private boolean analysisHasSize(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
private ColumnAnalysis analyzeStringColumn(
final ColumnCapabilities capabilities,
@Nullable final Column column,
final int cardinality
)
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
long size = 0;
if (column != null && analyzingSize()) {
if (!capabilities.hasBitmapIndexes()) {
return ColumnAnalysis.error("string_no_bitmap");
}
final BitmapIndex bitmapIndex = column.getBitmapIndex();
if (cardinality != bitmapIndex.getCardinality()) {
return ColumnAnalysis.error("bitmap_wrong_cardinality");
}
for (int i = 0; i < cardinality; ++i) {
String value = bitmapIndex.getValue(i);
if (value != null) {
size += StringUtils.toUtf8(value).length * bitmapIndex.getBitmap(value).size();
}
}
}
return new ColumnAnalysis(
capabilities.getType().name(),
capabilities.hasMultipleValues(),
size,
analyzingCardinality() ? cardinality : 0,
null
);
}
private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
private ColumnAnalysis analyzeComplexColumn(
@Nullable final ColumnCapabilities capabilities,
@Nullable final Column column,
final String typeName
)
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
final ComplexColumn complexColumn = column != null ? column.getComplexColumn() : null;
final boolean hasMultipleValues = capabilities != null && capabilities.hasMultipleValues();
long size = 0;
if (analyzingSize() && complexColumn != null) {
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
if (serde == null) {
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
}
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
if (inputSizeFn == null) {
return new ColumnAnalysis(typeName, hasMultipleValues, 0, null, null);
}
final int length = column.getLength();
for (int i = 0; i < length; ++i) {
size += inputSizeFn.apply(complexColumn.getRowValue(i));
}
}
return new ColumnAnalysis(
typeName,
hasMultipleValues,
size,
null,
null
);
}
}

View File

@ -148,7 +148,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
}
List<Interval> newIntervals = null;
if (query.hasInterval()) {
if (query.analyzingInterval()) {
//List returned by arg1.getIntervals() is immutable, so a new list needs to
//be created.
newIntervals = new ArrayList<>(arg1.getIntervals());

View File

@ -43,9 +43,7 @@ import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;
import java.util.ArrayList;
@ -60,7 +58,6 @@ import java.util.concurrent.TimeoutException;
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class);
@ -86,23 +83,12 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> responseContext)
{
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
final QueryableIndex index = segment.asQueryableIndex();
final Map<String, ColumnAnalysis> analyzedColumns;
final int numRows;
final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes());
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(segment);
final int numRows = analyzer.numRows(segment);
long totalSize = 0;
if (index == null) {
// IncrementalIndexSegments (used by in-memory hydrants in the realtime service) do not have a QueryableIndex
StorageAdapter segmentAdapter = segment.asStorageAdapter();
analyzedColumns = analyzer.analyze(segmentAdapter, query.getAnalysisTypes());
numRows = segmentAdapter.getNumRows();
} else {
analyzedColumns = analyzer.analyze(index, query.getAnalysisTypes());
numRows = index.getNumRows();
}
if (query.hasSize()) {
if (analyzer.analyzingSize()) {
// Initialize with the size of the whitespace, 1 byte per
totalSize = analyzedColumns.size() * numRows;
}
@ -120,7 +106,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
columns.put(columnName, column);
}
}
List<Interval> retIntervals = query.hasInterval() ? Arrays.asList(segment.getDataInterval()) : null;
List<Interval> retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null;
return Sequences.simple(
Arrays.asList(

View File

@ -22,18 +22,21 @@ package io.druid.query.metadata.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
*/
*/
public class ColumnAnalysis
{
private static final String ERROR_PREFIX = "error:";
public static ColumnAnalysis error(String reason)
{
return new ColumnAnalysis("STRING", -1, null, ERROR_PREFIX + reason);
return new ColumnAnalysis("STRING", false, -1, null, ERROR_PREFIX + reason);
}
private final String type;
private final boolean hasMultipleValues;
private final long size;
private final Integer cardinality;
private final String errorMessage;
@ -41,12 +44,14 @@ public class ColumnAnalysis
@JsonCreator
public ColumnAnalysis(
@JsonProperty("type") String type,
@JsonProperty("hasMultipleValues") boolean hasMultipleValues,
@JsonProperty("size") long size,
@JsonProperty("cardinality") Integer cardinality,
@JsonProperty("errorMessage") String errorMessage
)
{
this.type = type;
this.hasMultipleValues = hasMultipleValues;
this.size = size;
this.cardinality = cardinality;
this.errorMessage = errorMessage;
@ -58,6 +63,12 @@ public class ColumnAnalysis
return type;
}
@JsonProperty
public boolean isHasMultipleValues()
{
return hasMultipleValues;
}
@JsonProperty
public long getSize()
{
@ -96,14 +107,19 @@ public class ColumnAnalysis
if (cardinality == null) {
cardinality = rhsCardinality;
}
else {
} else {
if (rhsCardinality != null) {
cardinality = Math.max(cardinality, rhsCardinality);
}
}
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality, null);
return new ColumnAnalysis(
type,
hasMultipleValues || rhs.isHasMultipleValues(),
size + rhs.getSize(),
cardinality,
null
);
}
@Override
@ -111,6 +127,7 @@ public class ColumnAnalysis
{
return "ColumnAnalysis{" +
"type='" + type + '\'' +
", hasMultipleValues=" + hasMultipleValues +
", size=" + size +
", cardinality=" + cardinality +
", errorMessage='" + errorMessage + '\'' +
@ -126,29 +143,17 @@ public class ColumnAnalysis
if (o == null || getClass() != o.getClass()) {
return false;
}
ColumnAnalysis that = (ColumnAnalysis) o;
if (size != that.size) {
return false;
}
if (type != null ? !type.equals(that.type) : that.type != null) {
return false;
}
if (cardinality != null ? !cardinality.equals(that.cardinality) : that.cardinality != null) {
return false;
}
return !(errorMessage != null ? !errorMessage.equals(that.errorMessage) : that.errorMessage != null);
return hasMultipleValues == that.hasMultipleValues &&
size == that.size &&
Objects.equals(type, that.type) &&
Objects.equals(cardinality, that.cardinality) &&
Objects.equals(errorMessage, that.errorMessage);
}
@Override
public int hashCode()
{
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (int) (size ^ (size >>> 32));
result = 31 * result + (cardinality != null ? cardinality.hashCode() : 0);
result = 31 * result + (errorMessage != null ? errorMessage.hashCode() : 0);
return result;
return Objects.hash(type, hasMultipleValues, size, cardinality, errorMessage);
}
}

View File

@ -98,6 +98,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
return "SegmentAnalysis{" +
"id='" + id + '\'' +
", interval=" + interval +
", columns=" + columns +
", size=" + size +
", numRows=" + numRows +
'}';

View File

@ -151,22 +151,12 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
}
@JsonProperty
public EnumSet getAnalysisTypes()
public EnumSet<AnalysisType> getAnalysisTypes()
{
return analysisTypes;
}
public boolean hasCardinality()
{
return analysisTypes.contains(AnalysisType.CARDINALITY);
}
public boolean hasSize()
{
return analysisTypes.contains(AnalysisType.SIZE);
}
public boolean hasInterval()
public boolean analyzingInterval()
{
return analysisTypes.contains(AnalysisType.INTERVAL);
}

View File

@ -152,6 +152,14 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return index.getColumn(column).getCapabilities();
}
@Override
public String getColumnTypeName(String columnName)
{
final Column column = index.getColumn(columnName);
final ComplexColumn complexColumn = column.getComplexColumn();
return complexColumn != null ? complexColumn.getTypeName() : column.getCapabilities().getType().toString();
}
@Override
public DateTime getMaxIngestedEventTime()
{

View File

@ -46,6 +46,13 @@ public interface StorageAdapter extends CursorFactory
public DateTime getMaxTime();
public Capabilities getCapabilities();
public ColumnCapabilities getColumnCapabilities(String column);
/**
* Like {@link ColumnCapabilities#getType()}, but may return a more descriptive string for complex columns.
* @param column column name
* @return type name
*/
public String getColumnTypeName(String column);
public int getNumRows();
public DateTime getMaxIngestedEventTime();
}

View File

@ -569,7 +569,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public String getMetricType(String metric)
{
return metricDescs.get(metric).getType();
final MetricDesc metricDesc = metricDescs.get(metric);
return metricDesc != null ? metricDesc.getType() : null;
}
public Interval getInterval()

View File

@ -147,6 +147,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return index.getCapabilities(column);
}
@Override
public String getColumnTypeName(String column)
{
final String metricType = index.getMetricType(column);
return metricType != null ? metricType : getColumnCapabilities(column).getType().toString();
}
@Override
public DateTime getMaxIngestedEventTime()
{

View File

@ -310,15 +310,13 @@ public class QueryRunnerTestHelper
)
throws IOException
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
return ImmutableList.of(
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)),
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
);
}
@ -329,10 +327,9 @@ public class QueryRunnerTestHelper
)
throws IOException
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false);
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
return Arrays.asList(
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
@ -340,8 +337,7 @@ public class QueryRunnerTestHelper
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
),
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
)
);
}
/**

View File

@ -59,7 +59,7 @@ public class SegmentAnalyzerTest
private void testIncrementalWorksHelper(EnumSet<SegmentMetadataQuery.AnalysisType> analyses) throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null),
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null),
analyses
);

View File

@ -67,6 +67,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
true,
10881,
1,
null

View File

@ -43,6 +43,7 @@ import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.ListColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.TestIndex;
@ -51,27 +52,27 @@ import io.druid.timeline.LogicalSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RunWith(Parameterized.class)
public class SegmentMetadataQueryTest
{
private final SegmentMetadataQueryRunnerFactory factory = new SegmentMetadataQueryRunnerFactory(
private static final SegmentMetadataQueryRunnerFactory FACTORY = new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
@SuppressWarnings("unchecked")
private final QueryRunner runner = makeQueryRunner(factory);
private final ObjectMapper mapper = new DefaultObjectMapper();
@SuppressWarnings("unchecked")
public static QueryRunner makeQueryRunner(
public static QueryRunner makeMMappedQueryRunner(
QueryRunnerFactory factory
)
{
@ -81,15 +82,39 @@ public class SegmentMetadataQueryTest
);
}
@SuppressWarnings("unchecked")
public static QueryRunner makeIncrementalIndexQueryRunner(
QueryRunnerFactory factory
)
{
return QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), QueryRunnerTestHelper.segmentId)
);
}
private final QueryRunner runner;
private final boolean usingMmappedSegment;
private final SegmentMetadataQuery testQuery;
private final SegmentAnalysis expectedSegmentAnalysis;
public SegmentMetadataQueryTest()
@Parameterized.Parameters(name = "runner = {1}")
public static Collection<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{makeMMappedQueryRunner(FACTORY), "mmap", true},
new Object[]{makeIncrementalIndexQueryRunner(FACTORY), "incremental", false}
);
}
public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean usingMmappedSegment)
{
this.runner = runner;
this.usingMmappedSegment = usingMmappedSegment;
testQuery = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.toInclude(new ListColumnIncluderator(Arrays.asList("__time", "index", "placement")))
.analysisTypes(null)
.merge(true)
.build();
@ -100,14 +125,31 @@ public class SegmentMetadataQueryTest
new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")
),
ImmutableMap.of(
"__time",
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
12090,
null,
null
),
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
10881,
false,
usingMmappedSegment ? 10881 : 0,
1,
null
),
"index",
new ColumnAnalysis(
ValueType.FLOAT.toString(),
false,
9672,
null,
null
)
), 71982,
), usingMmappedSegment ? 71982 : 32643,
1209
);
}
@ -124,6 +166,124 @@ public class SegmentMetadataQueryTest
Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis), results);
}
@Test
public void testSegmentMetadataQueryWithHasMultipleValuesMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
0,
1,
null
),
"placementish",
new ColumnAnalysis(
ValueType.STRING.toString(),
true,
0,
9,
null
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2
);
QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement", "placementish")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.CARDINALITY)
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test
public void testSegmentMetadataQueryWithComplexColumnMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
"merged",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
0,
1,
null
),
"quality_uniques",
new ColumnAnalysis(
"hyperUnique",
false,
0,
null,
null
)
),
0,
expectedSegmentAnalysis.getNumRows() * 2
);
QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement", "quality_uniques")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.CARDINALITY)
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test
public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
{
@ -131,25 +291,42 @@ public class SegmentMetadataQueryTest
"merged",
ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)),
ImmutableMap.of(
"__time",
new ColumnAnalysis(
ValueType.LONG.toString(),
false,
12090 * 2,
null,
null
),
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
21762,
false,
usingMmappedSegment ? 21762 : 0,
1,
null
),
"index",
new ColumnAnalysis(
ValueType.FLOAT.toString(),
false,
9672 * 2,
null,
null
)
),
expectedSegmentAnalysis.getSize()*2,
expectedSegmentAnalysis.getNumRows()*2
expectedSegmentAnalysis.getSize() * 2,
expectedSegmentAnalysis.getNumRows() * 2
);
QueryToolChest toolChest = factory.getToolchest();
QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
@ -178,22 +355,23 @@ public class SegmentMetadataQueryTest
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
0,
0,
null
)
),
0,
expectedSegmentAnalysis.getNumRows()*2
expectedSegmentAnalysis.getNumRows() * 2
);
QueryToolChest toolChest = factory.getToolchest();
QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
)
@ -230,13 +408,13 @@ public class SegmentMetadataQueryTest
)
);
QueryToolChest toolChest = factory.getToolchest();
QueryToolChest toolChest = FACTORY.getToolchest();
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
//Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in
//https://github.com/druid-io/druid/pull/1172
@ -273,14 +451,14 @@ public class SegmentMetadataQueryTest
SegmentMetadataQuery.AnalysisType.SIZE
);
Query query = mapper.readValue(queryStr, Query.class);
Query query = MAPPER.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0));
Assert.assertEquals(expectedAnalysisTypes, ((SegmentMetadataQuery) query).getAnalysisTypes());
// test serialize and deserialize
Assert.assertEquals(query, mapper.readValue(mapper.writeValueAsString(query), Query.class));
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
}
@Test
@ -290,14 +468,14 @@ public class SegmentMetadataQueryTest
+ " \"queryType\":\"segmentMetadata\",\n"
+ " \"dataSource\":\"test_ds\"\n"
+ "}";
Query query = mapper.readValue(queryStr, Query.class);
Query query = MAPPER.readValue(queryStr, Query.class);
Assert.assertTrue(query instanceof SegmentMetadataQuery);
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
Assert.assertEquals(new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT), query.getIntervals().get(0));
Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval());
// test serialize and deserialize
Assert.assertEquals(query, mapper.readValue(mapper.writeValueAsString(query), Query.class));
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
}
@Test

View File

@ -83,8 +83,8 @@ public class SearchQueryRunnerWithCaseTest
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\ta\u0001preferred\t94.874713"
);
IncrementalIndex index1 = TestIndex.makeRealtimeIndex(input, true);
IncrementalIndex index2 = TestIndex.makeRealtimeIndex(input, false);
IncrementalIndex index1 = TestIndex.makeRealtimeIndex(input);
IncrementalIndex index2 = TestIndex.makeRealtimeIndex(input);
QueryableIndex index3 = TestIndex.persistRealtimeAndLoadMMapped(index1);
QueryableIndex index4 = TestIndex.persistRealtimeAndLoadMMapped(index2);

View File

@ -121,7 +121,7 @@ public class TopNQueryQueryToolChestTest
);
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId)
);
Map<String, Object> context = Maps.newHashMap();

View File

@ -106,7 +106,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
TestCases.rtIndex,
QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId)
)
);
testCaseMap.put(
@ -123,13 +123,6 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
new QueryableIndexSegment(segmentId, TestIndex.mergedRealtimeIndex())
)
);
testCaseMap.put(
TestCases.rtIndexOffheap,
QueryRunnerTestHelper.makeQueryRunner(
factory,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(true), segmentId)
)
);
//Thread.sleep(10000);
}

View File

@ -70,7 +70,7 @@ public class TestIndex
"placementish",
"partial_null_column",
"null_column",
};
};
public static final String[] METRICS = new String[]{"index"};
private static final Logger log = new Logger(TestIndex.class);
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
@ -93,7 +93,7 @@ public class TestIndex
private static QueryableIndex mmappedIndex = null;
private static QueryableIndex mergedRealtime = null;
public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap)
public static IncrementalIndex getIncrementalTestIndex()
{
synchronized (log) {
if (realtimeIndex != null) {
@ -101,7 +101,7 @@ public class TestIndex
}
}
return realtimeIndex = makeRealtimeIndex("druid.sample.tsv", useOffheap);
return realtimeIndex = makeRealtimeIndex("druid.sample.tsv");
}
public static QueryableIndex getMMappedTestIndex()
@ -112,7 +112,7 @@ public class TestIndex
}
}
IncrementalIndex incrementalIndex = getIncrementalTestIndex(false);
IncrementalIndex incrementalIndex = getIncrementalTestIndex();
mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex);
return mmappedIndex;
@ -126,8 +126,8 @@ public class TestIndex
}
try {
IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top", false);
IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom", false);
IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top");
IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom");
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
@ -163,7 +163,7 @@ public class TestIndex
}
}
private static IncrementalIndex makeRealtimeIndex(final String resourceFilename, final boolean useOffheap)
private static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
{
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
if (resource == null) {
@ -171,10 +171,10 @@ public class TestIndex
}
log.info("Realtime loading index file[%s]", resource);
CharSource stream = Resources.asByteSource(resource).asCharSource(Charsets.UTF_8);
return makeRealtimeIndex(stream, useOffheap);
return makeRealtimeIndex(stream);
}
public static IncrementalIndex makeRealtimeIndex(final CharSource source, final boolean useOffheap)
public static IncrementalIndex makeRealtimeIndex(final CharSource source)
{
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())