mirror of https://github.com/apache/druid.git
Add StorageAdapter#getColumnTypeName, and various SegmentMetadataQuery adjustments.
SegmentMetadataQuery stuff: - Simplify implementation of SegmentAnalyzer. - Fix type names for realtime complex columns; this used to try to merge a nice type name (like "hyperUnique") from mmapped segments with the word "COMPLEX" from incremental index segments, leading to a merge failure. Now it always uses the nice name. - Add hasMultipleValues to ColumnAnalysis. - Add tests for both mmapped and incremental index segments. - Update docs to include errorMessage.
This commit is contained in:
parent
3f998117a6
commit
87c8046c6c
|
@ -39,10 +39,10 @@ The format of the result is:
|
|||
"id" : "some_id",
|
||||
"intervals" : [ "2013-05-13T00:00:00.000Z/2013-05-14T00:00:00.000Z" ],
|
||||
"columns" : {
|
||||
"__time" : { "type" : "LONG", "size" : 407240380, "cardinality" : null },
|
||||
"dim1" : { "type" : "STRING", "size" : 100000, "cardinality" : 1944 },
|
||||
"dim2" : { "type" : "STRING", "size" : 100000, "cardinality" : 1504 },
|
||||
"metric1" : { "type" : "FLOAT", "size" : 100000, "cardinality" : null }
|
||||
"__time" : { "type" : "LONG", "hasMultipleValues" : false, "size" : 407240380, "cardinality" : null, "errorMessage" : null },
|
||||
"dim1" : { "type" : "STRING", "hasMultipleValues" : false, "size" : 100000, "cardinality" : 1944, "errorMessage" : null },
|
||||
"dim2" : { "type" : "STRING", "hasMultipleValues" : true, "size" : 100000, "cardinality" : 1504, "errorMessage" : null },
|
||||
"metric1" : { "type" : "FLOAT", "hasMultipleValues" : false, "size" : 100000, "cardinality" : null, "errorMessage" : null }
|
||||
},
|
||||
"size" : 300000,
|
||||
"numRows" : 5000000
|
||||
|
@ -53,6 +53,9 @@ Dimension columns will have type `STRING`.
|
|||
Metric columns will have type `FLOAT` or `LONG` or name of the underlying complex type such as `hyperUnique` in case of COMPLEX metric.
|
||||
Timestamp column will have type `LONG`.
|
||||
|
||||
If the `errorMessage` field is non-null, you should not trust the other fields in the response. Their contents are
|
||||
undefined.
|
||||
|
||||
Only columns which are dimensions (ie, have type `STRING`) will have any cardinality. Rest of the columns (timestamp and metric columns) will show cardinality as `null`.
|
||||
|
||||
### intervals
|
||||
|
|
|
@ -22,28 +22,29 @@ package io.druid.query.metadata;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.StringUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.query.metadata.metadata.ColumnAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ComplexColumn;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.serde.ComplexMetricSerde;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
|
||||
import java.util.Collections;
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class SegmentAnalyzer
|
||||
{
|
||||
|
@ -59,33 +60,54 @@ public class SegmentAnalyzer
|
|||
*/
|
||||
private static final int NUM_BYTES_IN_TEXT_FLOAT = 8;
|
||||
|
||||
public Map<String, ColumnAnalysis> analyze(
|
||||
QueryableIndex index,
|
||||
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
|
||||
)
|
||||
private final EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes;
|
||||
|
||||
public SegmentAnalyzer(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
{
|
||||
Preconditions.checkNotNull(index, "Index cannot be null");
|
||||
this.analysisTypes = analysisTypes;
|
||||
}
|
||||
|
||||
public int numRows(Segment segment)
|
||||
{
|
||||
return Preconditions.checkNotNull(segment, "segment").asStorageAdapter().getNumRows();
|
||||
}
|
||||
|
||||
public Map<String, ColumnAnalysis> analyze(Segment segment)
|
||||
{
|
||||
Preconditions.checkNotNull(segment, "segment");
|
||||
|
||||
// index is null for incremental-index-based segments, but storageAdapter is always available
|
||||
final QueryableIndex index = segment.asQueryableIndex();
|
||||
final StorageAdapter storageAdapter = segment.asStorageAdapter();
|
||||
|
||||
// get length and column names from storageAdapter
|
||||
final int length = storageAdapter.getNumRows();
|
||||
final Set<String> columnNames = Sets.newHashSet();
|
||||
Iterables.addAll(columnNames, storageAdapter.getAvailableDimensions());
|
||||
Iterables.addAll(columnNames, storageAdapter.getAvailableMetrics());
|
||||
|
||||
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
|
||||
|
||||
for (String columnName : index.getColumnNames()) {
|
||||
final Column column = index.getColumn(columnName);
|
||||
final ColumnCapabilities capabilities = column.getCapabilities();
|
||||
for (String columnName : columnNames) {
|
||||
final Column column = index == null ? null : index.getColumn(columnName);
|
||||
final ColumnCapabilities capabilities = column != null
|
||||
? column.getCapabilities()
|
||||
: storageAdapter.getColumnCapabilities(columnName);
|
||||
|
||||
final ColumnAnalysis analysis;
|
||||
final ValueType type = capabilities.getType();
|
||||
switch (type) {
|
||||
case LONG:
|
||||
analysis = analyzeLongColumn(column, analysisTypes);
|
||||
analysis = analyzeNumericColumn(capabilities, length, Longs.BYTES);
|
||||
break;
|
||||
case FLOAT:
|
||||
analysis = analyzeFloatColumn(column, analysisTypes);
|
||||
analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT);
|
||||
break;
|
||||
case STRING:
|
||||
analysis = analyzeStringColumn(column, analysisTypes);
|
||||
analysis = analyzeStringColumn(capabilities, column, storageAdapter.getDimensionCardinality(columnName));
|
||||
break;
|
||||
case COMPLEX:
|
||||
analysis = analyzeComplexColumn(column, analysisTypes);
|
||||
analysis = analyzeComplexColumn(capabilities, column, storageAdapter.getColumnTypeName(columnName));
|
||||
break;
|
||||
default:
|
||||
log.warn("Unknown column type[%s].", type);
|
||||
|
@ -95,201 +117,122 @@ public class SegmentAnalyzer
|
|||
columns.put(columnName, analysis);
|
||||
}
|
||||
|
||||
// Add time column too
|
||||
ColumnCapabilities timeCapabilities = storageAdapter.getColumnCapabilities(Column.TIME_COLUMN_NAME);
|
||||
if (timeCapabilities == null) {
|
||||
timeCapabilities = new ColumnCapabilitiesImpl().setType(ValueType.LONG).setHasMultipleValues(false);
|
||||
}
|
||||
columns.put(
|
||||
Column.TIME_COLUMN_NAME,
|
||||
lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP, analysisTypes)
|
||||
analyzeNumericColumn(timeCapabilities, length, NUM_BYTES_IN_TIMESTAMP)
|
||||
);
|
||||
|
||||
return columns;
|
||||
}
|
||||
|
||||
public Map<String, ColumnAnalysis> analyze(
|
||||
StorageAdapter adapter,
|
||||
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
|
||||
public boolean analyzingSize()
|
||||
{
|
||||
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
|
||||
}
|
||||
|
||||
public boolean analyzingCardinality()
|
||||
{
|
||||
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
|
||||
}
|
||||
|
||||
private ColumnAnalysis analyzeNumericColumn(
|
||||
final ColumnCapabilities capabilities,
|
||||
final int length,
|
||||
final int sizePerRow
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(adapter, "Adapter cannot be null");
|
||||
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
|
||||
List<String> columnNames = getStorageAdapterColumnNames(adapter);
|
||||
|
||||
int numRows = adapter.getNumRows();
|
||||
for (String columnName : columnNames) {
|
||||
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnName);
|
||||
final ColumnAnalysis analysis;
|
||||
|
||||
/**
|
||||
* StorageAdapter doesn't provide a way to get column values, so size is
|
||||
* not calculated for STRING and COMPLEX columns.
|
||||
*/
|
||||
ValueType capType = capabilities.getType();
|
||||
switch (capType) {
|
||||
case LONG:
|
||||
analysis = lengthBasedAnalysisForAdapter(
|
||||
analysisTypes,
|
||||
capType.name(), capabilities,
|
||||
numRows, Longs.BYTES
|
||||
);
|
||||
break;
|
||||
case FLOAT:
|
||||
analysis = lengthBasedAnalysisForAdapter(
|
||||
analysisTypes,
|
||||
capType.name(), capabilities,
|
||||
numRows, NUM_BYTES_IN_TEXT_FLOAT
|
||||
);
|
||||
break;
|
||||
case STRING:
|
||||
analysis = new ColumnAnalysis(
|
||||
capType.name(),
|
||||
0,
|
||||
analysisHasCardinality(analysisTypes) ? adapter.getDimensionCardinality(columnName) : 0,
|
||||
null
|
||||
);
|
||||
break;
|
||||
case COMPLEX:
|
||||
analysis = new ColumnAnalysis(
|
||||
capType.name(),
|
||||
0,
|
||||
null,
|
||||
null
|
||||
);
|
||||
break;
|
||||
default:
|
||||
log.warn("Unknown column type[%s].", capType);
|
||||
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", capType));
|
||||
}
|
||||
|
||||
columns.put(columnName, analysis);
|
||||
}
|
||||
|
||||
columns.put(
|
||||
Column.TIME_COLUMN_NAME,
|
||||
lengthBasedAnalysisForAdapter(analysisTypes, ValueType.LONG.name(), null, numRows, NUM_BYTES_IN_TIMESTAMP)
|
||||
);
|
||||
|
||||
return columns;
|
||||
}
|
||||
|
||||
|
||||
public ColumnAnalysis analyzeLongColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
{
|
||||
return lengthBasedAnalysis(column, Longs.BYTES, analysisTypes);
|
||||
}
|
||||
|
||||
public ColumnAnalysis analyzeFloatColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
{
|
||||
return lengthBasedAnalysis(column, NUM_BYTES_IN_TEXT_FLOAT, analysisTypes);
|
||||
}
|
||||
|
||||
private ColumnAnalysis lengthBasedAnalysis(
|
||||
Column column,
|
||||
final int numBytes,
|
||||
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes
|
||||
)
|
||||
{
|
||||
final ColumnCapabilities capabilities = column.getCapabilities();
|
||||
if (capabilities.hasMultipleValues()) {
|
||||
return ColumnAnalysis.error("multi_value");
|
||||
}
|
||||
|
||||
int size = 0;
|
||||
if (analysisHasSize(analysisTypes)) {
|
||||
size = column.getLength() * numBytes;
|
||||
}
|
||||
|
||||
return new ColumnAnalysis(capabilities.getType().name(), size, null, null);
|
||||
}
|
||||
|
||||
public ColumnAnalysis analyzeStringColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
{
|
||||
final ColumnCapabilities capabilities = column.getCapabilities();
|
||||
|
||||
if (capabilities.hasBitmapIndexes()) {
|
||||
final BitmapIndex bitmapIndex = column.getBitmapIndex();
|
||||
|
||||
int cardinality = bitmapIndex.getCardinality();
|
||||
long size = 0;
|
||||
|
||||
if (analysisHasSize(analysisTypes)) {
|
||||
for (int i = 0; i < cardinality; ++i) {
|
||||
String value = bitmapIndex.getValue(i);
|
||||
if (value != null) {
|
||||
size += StringUtils.toUtf8(value).length * bitmapIndex.getBitmap(value).size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ColumnAnalysis(
|
||||
capabilities.getType().name(),
|
||||
size,
|
||||
analysisHasCardinality(analysisTypes) ? cardinality : 0,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
return ColumnAnalysis.error("string_no_bitmap");
|
||||
}
|
||||
|
||||
public ColumnAnalysis analyzeComplexColumn(Column column, EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
{
|
||||
final ColumnCapabilities capabilities = column.getCapabilities();
|
||||
final ComplexColumn complexColumn = column.getComplexColumn();
|
||||
|
||||
final String typeName = complexColumn.getTypeName();
|
||||
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
|
||||
if (serde == null) {
|
||||
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
|
||||
}
|
||||
|
||||
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
|
||||
if (inputSizeFn == null) {
|
||||
return new ColumnAnalysis(typeName, 0, null, null);
|
||||
}
|
||||
|
||||
final int length = column.getLength();
|
||||
long size = 0;
|
||||
if (analysisHasSize(analysisTypes)) {
|
||||
for (int i = 0; i < length; ++i) {
|
||||
size += inputSizeFn.apply(complexColumn.getRowValue(i));
|
||||
|
||||
if (analyzingSize()) {
|
||||
if (capabilities.hasMultipleValues()) {
|
||||
return ColumnAnalysis.error("multi_value");
|
||||
}
|
||||
|
||||
size = ((long) length) * sizePerRow;
|
||||
}
|
||||
|
||||
return new ColumnAnalysis(typeName, size, null, null);
|
||||
}
|
||||
|
||||
private List<String> getStorageAdapterColumnNames(StorageAdapter adapter)
|
||||
{
|
||||
Indexed<String> dims = adapter.getAvailableDimensions();
|
||||
Iterable<String> metrics = adapter.getAvailableMetrics();
|
||||
Iterable<String> columnNames = Iterables.concat(dims, metrics);
|
||||
List<String> sortedColumnNames = Lists.newArrayList(columnNames);
|
||||
Collections.sort(sortedColumnNames);
|
||||
return sortedColumnNames;
|
||||
}
|
||||
|
||||
private ColumnAnalysis lengthBasedAnalysisForAdapter(
|
||||
EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes,
|
||||
String type, ColumnCapabilities capabilities,
|
||||
int numRows, final int numBytes
|
||||
)
|
||||
{
|
||||
if (capabilities != null && capabilities.hasMultipleValues()) {
|
||||
return ColumnAnalysis.error("multi_value");
|
||||
}
|
||||
return new ColumnAnalysis(
|
||||
type,
|
||||
analysisHasSize(analysisTypes) ? numRows * numBytes : 0,
|
||||
capabilities.getType().name(),
|
||||
capabilities.hasMultipleValues(),
|
||||
size,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private boolean analysisHasSize(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
private ColumnAnalysis analyzeStringColumn(
|
||||
final ColumnCapabilities capabilities,
|
||||
@Nullable final Column column,
|
||||
final int cardinality
|
||||
)
|
||||
{
|
||||
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.SIZE);
|
||||
long size = 0;
|
||||
|
||||
if (column != null && analyzingSize()) {
|
||||
if (!capabilities.hasBitmapIndexes()) {
|
||||
return ColumnAnalysis.error("string_no_bitmap");
|
||||
}
|
||||
|
||||
final BitmapIndex bitmapIndex = column.getBitmapIndex();
|
||||
if (cardinality != bitmapIndex.getCardinality()) {
|
||||
return ColumnAnalysis.error("bitmap_wrong_cardinality");
|
||||
}
|
||||
|
||||
for (int i = 0; i < cardinality; ++i) {
|
||||
String value = bitmapIndex.getValue(i);
|
||||
if (value != null) {
|
||||
size += StringUtils.toUtf8(value).length * bitmapIndex.getBitmap(value).size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ColumnAnalysis(
|
||||
capabilities.getType().name(),
|
||||
capabilities.hasMultipleValues(),
|
||||
size,
|
||||
analyzingCardinality() ? cardinality : 0,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private boolean analysisHasCardinality(EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes)
|
||||
private ColumnAnalysis analyzeComplexColumn(
|
||||
@Nullable final ColumnCapabilities capabilities,
|
||||
@Nullable final Column column,
|
||||
final String typeName
|
||||
)
|
||||
{
|
||||
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
|
||||
final ComplexColumn complexColumn = column != null ? column.getComplexColumn() : null;
|
||||
final boolean hasMultipleValues = capabilities != null && capabilities.hasMultipleValues();
|
||||
long size = 0;
|
||||
|
||||
if (analyzingSize() && complexColumn != null) {
|
||||
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
|
||||
if (serde == null) {
|
||||
return ColumnAnalysis.error(String.format("unknown_complex_%s", typeName));
|
||||
}
|
||||
|
||||
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
|
||||
if (inputSizeFn == null) {
|
||||
return new ColumnAnalysis(typeName, hasMultipleValues, 0, null, null);
|
||||
}
|
||||
|
||||
final int length = column.getLength();
|
||||
for (int i = 0; i < length; ++i) {
|
||||
size += inputSizeFn.apply(complexColumn.getRowValue(i));
|
||||
}
|
||||
}
|
||||
|
||||
return new ColumnAnalysis(
|
||||
typeName,
|
||||
hasMultipleValues,
|
||||
size,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
|
|||
}
|
||||
|
||||
List<Interval> newIntervals = null;
|
||||
if (query.hasInterval()) {
|
||||
if (query.analyzingInterval()) {
|
||||
//List returned by arg1.getIntervals() is immutable, so a new list needs to
|
||||
//be created.
|
||||
newIntervals = new ArrayList<>(arg1.getIntervals());
|
||||
|
|
|
@ -43,9 +43,7 @@ import io.druid.query.metadata.metadata.ColumnAnalysis;
|
|||
import io.druid.query.metadata.metadata.ColumnIncluderator;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -60,7 +58,6 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
|
||||
{
|
||||
private static final SegmentAnalyzer analyzer = new SegmentAnalyzer();
|
||||
private static final Logger log = new Logger(SegmentMetadataQueryRunnerFactory.class);
|
||||
|
||||
|
||||
|
@ -86,23 +83,12 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> responseContext)
|
||||
{
|
||||
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
|
||||
|
||||
final QueryableIndex index = segment.asQueryableIndex();
|
||||
|
||||
final Map<String, ColumnAnalysis> analyzedColumns;
|
||||
final int numRows;
|
||||
final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes());
|
||||
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(segment);
|
||||
final int numRows = analyzer.numRows(segment);
|
||||
long totalSize = 0;
|
||||
if (index == null) {
|
||||
// IncrementalIndexSegments (used by in-memory hydrants in the realtime service) do not have a QueryableIndex
|
||||
StorageAdapter segmentAdapter = segment.asStorageAdapter();
|
||||
analyzedColumns = analyzer.analyze(segmentAdapter, query.getAnalysisTypes());
|
||||
numRows = segmentAdapter.getNumRows();
|
||||
} else {
|
||||
analyzedColumns = analyzer.analyze(index, query.getAnalysisTypes());
|
||||
numRows = index.getNumRows();
|
||||
}
|
||||
|
||||
if (query.hasSize()) {
|
||||
if (analyzer.analyzingSize()) {
|
||||
// Initialize with the size of the whitespace, 1 byte per
|
||||
totalSize = analyzedColumns.size() * numRows;
|
||||
}
|
||||
|
@ -120,7 +106,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
|
|||
columns.put(columnName, column);
|
||||
}
|
||||
}
|
||||
List<Interval> retIntervals = query.hasInterval() ? Arrays.asList(segment.getDataInterval()) : null;
|
||||
List<Interval> retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null;
|
||||
|
||||
return Sequences.simple(
|
||||
Arrays.asList(
|
||||
|
|
|
@ -22,18 +22,21 @@ package io.druid.query.metadata.metadata;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*/
|
||||
*/
|
||||
public class ColumnAnalysis
|
||||
{
|
||||
private static final String ERROR_PREFIX = "error:";
|
||||
|
||||
public static ColumnAnalysis error(String reason)
|
||||
{
|
||||
return new ColumnAnalysis("STRING", -1, null, ERROR_PREFIX + reason);
|
||||
return new ColumnAnalysis("STRING", false, -1, null, ERROR_PREFIX + reason);
|
||||
}
|
||||
|
||||
private final String type;
|
||||
private final boolean hasMultipleValues;
|
||||
private final long size;
|
||||
private final Integer cardinality;
|
||||
private final String errorMessage;
|
||||
|
@ -41,12 +44,14 @@ public class ColumnAnalysis
|
|||
@JsonCreator
|
||||
public ColumnAnalysis(
|
||||
@JsonProperty("type") String type,
|
||||
@JsonProperty("hasMultipleValues") boolean hasMultipleValues,
|
||||
@JsonProperty("size") long size,
|
||||
@JsonProperty("cardinality") Integer cardinality,
|
||||
@JsonProperty("errorMessage") String errorMessage
|
||||
)
|
||||
{
|
||||
this.type = type;
|
||||
this.hasMultipleValues = hasMultipleValues;
|
||||
this.size = size;
|
||||
this.cardinality = cardinality;
|
||||
this.errorMessage = errorMessage;
|
||||
|
@ -58,6 +63,12 @@ public class ColumnAnalysis
|
|||
return type;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isHasMultipleValues()
|
||||
{
|
||||
return hasMultipleValues;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getSize()
|
||||
{
|
||||
|
@ -96,14 +107,19 @@ public class ColumnAnalysis
|
|||
if (cardinality == null) {
|
||||
|
||||
cardinality = rhsCardinality;
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
if (rhsCardinality != null) {
|
||||
cardinality = Math.max(cardinality, rhsCardinality);
|
||||
}
|
||||
}
|
||||
|
||||
return new ColumnAnalysis(type, size + rhs.getSize(), cardinality, null);
|
||||
return new ColumnAnalysis(
|
||||
type,
|
||||
hasMultipleValues || rhs.isHasMultipleValues(),
|
||||
size + rhs.getSize(),
|
||||
cardinality,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,6 +127,7 @@ public class ColumnAnalysis
|
|||
{
|
||||
return "ColumnAnalysis{" +
|
||||
"type='" + type + '\'' +
|
||||
", hasMultipleValues=" + hasMultipleValues +
|
||||
", size=" + size +
|
||||
", cardinality=" + cardinality +
|
||||
", errorMessage='" + errorMessage + '\'' +
|
||||
|
@ -126,29 +143,17 @@ public class ColumnAnalysis
|
|||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ColumnAnalysis that = (ColumnAnalysis) o;
|
||||
|
||||
if (size != that.size) {
|
||||
return false;
|
||||
}
|
||||
if (type != null ? !type.equals(that.type) : that.type != null) {
|
||||
return false;
|
||||
}
|
||||
if (cardinality != null ? !cardinality.equals(that.cardinality) : that.cardinality != null) {
|
||||
return false;
|
||||
}
|
||||
return !(errorMessage != null ? !errorMessage.equals(that.errorMessage) : that.errorMessage != null);
|
||||
|
||||
return hasMultipleValues == that.hasMultipleValues &&
|
||||
size == that.size &&
|
||||
Objects.equals(type, that.type) &&
|
||||
Objects.equals(cardinality, that.cardinality) &&
|
||||
Objects.equals(errorMessage, that.errorMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = type != null ? type.hashCode() : 0;
|
||||
result = 31 * result + (int) (size ^ (size >>> 32));
|
||||
result = 31 * result + (cardinality != null ? cardinality.hashCode() : 0);
|
||||
result = 31 * result + (errorMessage != null ? errorMessage.hashCode() : 0);
|
||||
return result;
|
||||
return Objects.hash(type, hasMultipleValues, size, cardinality, errorMessage);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
|
|||
return "SegmentAnalysis{" +
|
||||
"id='" + id + '\'' +
|
||||
", interval=" + interval +
|
||||
", columns=" + columns +
|
||||
", size=" + size +
|
||||
", numRows=" + numRows +
|
||||
'}';
|
||||
|
|
|
@ -151,22 +151,12 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public EnumSet getAnalysisTypes()
|
||||
public EnumSet<AnalysisType> getAnalysisTypes()
|
||||
{
|
||||
return analysisTypes;
|
||||
}
|
||||
|
||||
public boolean hasCardinality()
|
||||
{
|
||||
return analysisTypes.contains(AnalysisType.CARDINALITY);
|
||||
}
|
||||
|
||||
public boolean hasSize()
|
||||
{
|
||||
return analysisTypes.contains(AnalysisType.SIZE);
|
||||
}
|
||||
|
||||
public boolean hasInterval()
|
||||
public boolean analyzingInterval()
|
||||
{
|
||||
return analysisTypes.contains(AnalysisType.INTERVAL);
|
||||
}
|
||||
|
|
|
@ -152,6 +152,14 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
return index.getColumn(column).getCapabilities();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getColumnTypeName(String columnName)
|
||||
{
|
||||
final Column column = index.getColumn(columnName);
|
||||
final ComplexColumn complexColumn = column.getComplexColumn();
|
||||
return complexColumn != null ? complexColumn.getTypeName() : column.getCapabilities().getType().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getMaxIngestedEventTime()
|
||||
{
|
||||
|
|
|
@ -46,6 +46,13 @@ public interface StorageAdapter extends CursorFactory
|
|||
public DateTime getMaxTime();
|
||||
public Capabilities getCapabilities();
|
||||
public ColumnCapabilities getColumnCapabilities(String column);
|
||||
|
||||
/**
|
||||
* Like {@link ColumnCapabilities#getType()}, but may return a more descriptive string for complex columns.
|
||||
* @param column column name
|
||||
* @return type name
|
||||
*/
|
||||
public String getColumnTypeName(String column);
|
||||
public int getNumRows();
|
||||
public DateTime getMaxIngestedEventTime();
|
||||
}
|
||||
|
|
|
@ -569,7 +569,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
|
||||
public String getMetricType(String metric)
|
||||
{
|
||||
return metricDescs.get(metric).getType();
|
||||
final MetricDesc metricDesc = metricDescs.get(metric);
|
||||
return metricDesc != null ? metricDesc.getType() : null;
|
||||
}
|
||||
|
||||
public Interval getInterval()
|
||||
|
|
|
@ -147,6 +147,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
return index.getCapabilities(column);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getColumnTypeName(String column)
|
||||
{
|
||||
final String metricType = index.getMetricType(column);
|
||||
return metricType != null ? metricType : getColumnCapabilities(column).getType().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTime getMaxIngestedEventTime()
|
||||
{
|
||||
|
|
|
@ -310,15 +310,13 @@ public class QueryRunnerTestHelper
|
|||
)
|
||||
throws IOException
|
||||
{
|
||||
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false);
|
||||
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
|
||||
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
|
||||
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
|
||||
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
|
||||
return ImmutableList.of(
|
||||
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
|
||||
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)),
|
||||
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)),
|
||||
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
|
||||
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -329,10 +327,9 @@ public class QueryRunnerTestHelper
|
|||
)
|
||||
throws IOException
|
||||
{
|
||||
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex(false);
|
||||
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
|
||||
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
|
||||
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
|
||||
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
|
||||
|
||||
return Arrays.asList(
|
||||
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
|
||||
|
@ -340,8 +337,7 @@ public class QueryRunnerTestHelper
|
|||
makeUnionQueryRunner(
|
||||
factory,
|
||||
new QueryableIndexSegment(segmentId, mergedRealtimeIndex)
|
||||
),
|
||||
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
|
||||
)
|
||||
);
|
||||
}
|
||||
/**
|
||||
|
|
|
@ -59,7 +59,7 @@ public class SegmentAnalyzerTest
|
|||
private void testIncrementalWorksHelper(EnumSet<SegmentMetadataQuery.AnalysisType> analyses) throws Exception
|
||||
{
|
||||
final List<SegmentAnalysis> results = getSegmentAnalysises(
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null),
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), null),
|
||||
analyses
|
||||
);
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ public class SegmentMetadataQueryQueryToolChestTest
|
|||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
true,
|
||||
10881,
|
||||
1,
|
||||
null
|
||||
|
|
|
@ -43,6 +43,7 @@ import io.druid.query.metadata.metadata.ColumnAnalysis;
|
|||
import io.druid.query.metadata.metadata.ListColumnIncluderator;
|
||||
import io.druid.query.metadata.metadata.SegmentAnalysis;
|
||||
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.TestIndex;
|
||||
|
@ -51,27 +52,27 @@ import io.druid.timeline.LogicalSegment;
|
|||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class SegmentMetadataQueryTest
|
||||
{
|
||||
private final SegmentMetadataQueryRunnerFactory factory = new SegmentMetadataQueryRunnerFactory(
|
||||
private static final SegmentMetadataQueryRunnerFactory FACTORY = new SegmentMetadataQueryRunnerFactory(
|
||||
new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
|
||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||
);
|
||||
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private final QueryRunner runner = makeQueryRunner(factory);
|
||||
|
||||
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static QueryRunner makeQueryRunner(
|
||||
public static QueryRunner makeMMappedQueryRunner(
|
||||
QueryRunnerFactory factory
|
||||
)
|
||||
{
|
||||
|
@ -81,15 +82,39 @@ public class SegmentMetadataQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static QueryRunner makeIncrementalIndexQueryRunner(
|
||||
QueryRunnerFactory factory
|
||||
)
|
||||
{
|
||||
return QueryRunnerTestHelper.makeQueryRunner(
|
||||
factory,
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), QueryRunnerTestHelper.segmentId)
|
||||
);
|
||||
}
|
||||
|
||||
private final QueryRunner runner;
|
||||
private final boolean usingMmappedSegment;
|
||||
private final SegmentMetadataQuery testQuery;
|
||||
private final SegmentAnalysis expectedSegmentAnalysis;
|
||||
|
||||
public SegmentMetadataQueryTest()
|
||||
@Parameterized.Parameters(name = "runner = {1}")
|
||||
public static Collection<Object[]> constructorFeeder()
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new Object[]{makeMMappedQueryRunner(FACTORY), "mmap", true},
|
||||
new Object[]{makeIncrementalIndexQueryRunner(FACTORY), "incremental", false}
|
||||
);
|
||||
}
|
||||
|
||||
public SegmentMetadataQueryTest(QueryRunner runner, String runnerName, boolean usingMmappedSegment)
|
||||
{
|
||||
this.runner = runner;
|
||||
this.usingMmappedSegment = usingMmappedSegment;
|
||||
testQuery = Druids.newSegmentMetadataQueryBuilder()
|
||||
.dataSource("testing")
|
||||
.intervals("2013/2014")
|
||||
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
|
||||
.toInclude(new ListColumnIncluderator(Arrays.asList("__time", "index", "placement")))
|
||||
.analysisTypes(null)
|
||||
.merge(true)
|
||||
.build();
|
||||
|
@ -100,14 +125,31 @@ public class SegmentMetadataQueryTest
|
|||
new Interval("2011-01-12T00:00:00.000Z/2011-04-15T00:00:00.001Z")
|
||||
),
|
||||
ImmutableMap.of(
|
||||
"__time",
|
||||
new ColumnAnalysis(
|
||||
ValueType.LONG.toString(),
|
||||
false,
|
||||
12090,
|
||||
null,
|
||||
null
|
||||
),
|
||||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
10881,
|
||||
false,
|
||||
usingMmappedSegment ? 10881 : 0,
|
||||
1,
|
||||
null
|
||||
),
|
||||
"index",
|
||||
new ColumnAnalysis(
|
||||
ValueType.FLOAT.toString(),
|
||||
false,
|
||||
9672,
|
||||
null,
|
||||
null
|
||||
)
|
||||
), 71982,
|
||||
), usingMmappedSegment ? 71982 : 32643,
|
||||
1209
|
||||
);
|
||||
}
|
||||
|
@ -124,6 +166,124 @@ public class SegmentMetadataQueryTest
|
|||
Assert.assertEquals(Arrays.asList(expectedSegmentAnalysis), results);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSegmentMetadataQueryWithHasMultipleValuesMerge()
|
||||
{
|
||||
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
|
||||
"merged",
|
||||
null,
|
||||
ImmutableMap.of(
|
||||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
false,
|
||||
0,
|
||||
1,
|
||||
null
|
||||
),
|
||||
"placementish",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
true,
|
||||
0,
|
||||
9,
|
||||
null
|
||||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
||||
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
|
||||
ExecutorService exec = Executors.newCachedThreadPool();
|
||||
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(
|
||||
FACTORY.mergeRunners(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
|
||||
)
|
||||
),
|
||||
toolChest
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
ImmutableList.of(mergedSegmentAnalysis),
|
||||
myRunner.run(
|
||||
Druids.newSegmentMetadataQueryBuilder()
|
||||
.dataSource("testing")
|
||||
.intervals("2013/2014")
|
||||
.toInclude(new ListColumnIncluderator(Arrays.asList("placement", "placementish")))
|
||||
.analysisTypes(SegmentMetadataQuery.AnalysisType.CARDINALITY)
|
||||
.merge(true)
|
||||
.build(),
|
||||
Maps.newHashMap()
|
||||
),
|
||||
"failed SegmentMetadata merging query"
|
||||
);
|
||||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSegmentMetadataQueryWithComplexColumnMerge()
|
||||
{
|
||||
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
|
||||
"merged",
|
||||
null,
|
||||
ImmutableMap.of(
|
||||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
false,
|
||||
0,
|
||||
1,
|
||||
null
|
||||
),
|
||||
"quality_uniques",
|
||||
new ColumnAnalysis(
|
||||
"hyperUnique",
|
||||
false,
|
||||
0,
|
||||
null,
|
||||
null
|
||||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
||||
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
|
||||
ExecutorService exec = Executors.newCachedThreadPool();
|
||||
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(
|
||||
FACTORY.mergeRunners(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
|
||||
)
|
||||
),
|
||||
toolChest
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
ImmutableList.of(mergedSegmentAnalysis),
|
||||
myRunner.run(
|
||||
Druids.newSegmentMetadataQueryBuilder()
|
||||
.dataSource("testing")
|
||||
.intervals("2013/2014")
|
||||
.toInclude(new ListColumnIncluderator(Arrays.asList("placement", "quality_uniques")))
|
||||
.analysisTypes(SegmentMetadataQuery.AnalysisType.CARDINALITY)
|
||||
.merge(true)
|
||||
.build(),
|
||||
Maps.newHashMap()
|
||||
),
|
||||
"failed SegmentMetadata merging query"
|
||||
);
|
||||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
|
||||
{
|
||||
|
@ -131,25 +291,42 @@ public class SegmentMetadataQueryTest
|
|||
"merged",
|
||||
ImmutableList.of(expectedSegmentAnalysis.getIntervals().get(0)),
|
||||
ImmutableMap.of(
|
||||
"__time",
|
||||
new ColumnAnalysis(
|
||||
ValueType.LONG.toString(),
|
||||
false,
|
||||
12090 * 2,
|
||||
null,
|
||||
null
|
||||
),
|
||||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
21762,
|
||||
false,
|
||||
usingMmappedSegment ? 21762 : 0,
|
||||
1,
|
||||
null
|
||||
),
|
||||
"index",
|
||||
new ColumnAnalysis(
|
||||
ValueType.FLOAT.toString(),
|
||||
false,
|
||||
9672 * 2,
|
||||
null,
|
||||
null
|
||||
)
|
||||
),
|
||||
expectedSegmentAnalysis.getSize()*2,
|
||||
expectedSegmentAnalysis.getNumRows()*2
|
||||
expectedSegmentAnalysis.getSize() * 2,
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = factory.getToolchest();
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
||||
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
|
||||
ExecutorService exec = Executors.newCachedThreadPool();
|
||||
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(
|
||||
factory.mergeRunners(
|
||||
FACTORY.mergeRunners(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
|
||||
)
|
||||
|
@ -178,22 +355,23 @@ public class SegmentMetadataQueryTest
|
|||
"placement",
|
||||
new ColumnAnalysis(
|
||||
ValueType.STRING.toString(),
|
||||
false,
|
||||
0,
|
||||
0,
|
||||
null
|
||||
)
|
||||
),
|
||||
0,
|
||||
expectedSegmentAnalysis.getNumRows()*2
|
||||
expectedSegmentAnalysis.getNumRows() * 2
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = factory.getToolchest();
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
||||
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
|
||||
ExecutorService exec = Executors.newCachedThreadPool();
|
||||
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(
|
||||
factory.mergeRunners(
|
||||
FACTORY.mergeRunners(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(singleSegmentQueryRunner, singleSegmentQueryRunner)
|
||||
)
|
||||
|
@ -230,13 +408,13 @@ public class SegmentMetadataQueryTest
|
|||
)
|
||||
);
|
||||
|
||||
QueryToolChest toolChest = factory.getToolchest();
|
||||
QueryToolChest toolChest = FACTORY.getToolchest();
|
||||
|
||||
QueryRunner singleSegmentQueryRunner = toolChest.preMergeQueryDecoration(runner);
|
||||
ExecutorService exec = Executors.newCachedThreadPool();
|
||||
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
|
||||
toolChest.mergeResults(
|
||||
factory.mergeRunners(
|
||||
FACTORY.mergeRunners(
|
||||
MoreExecutors.sameThreadExecutor(),
|
||||
//Note: It is essential to have atleast 2 query runners merged to reproduce the regression bug described in
|
||||
//https://github.com/druid-io/druid/pull/1172
|
||||
|
@ -273,14 +451,14 @@ public class SegmentMetadataQueryTest
|
|||
SegmentMetadataQuery.AnalysisType.SIZE
|
||||
);
|
||||
|
||||
Query query = mapper.readValue(queryStr, Query.class);
|
||||
Query query = MAPPER.readValue(queryStr, Query.class);
|
||||
Assert.assertTrue(query instanceof SegmentMetadataQuery);
|
||||
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
|
||||
Assert.assertEquals(new Interval("2013-12-04T00:00:00.000Z/2013-12-05T00:00:00.000Z"), query.getIntervals().get(0));
|
||||
Assert.assertEquals(expectedAnalysisTypes, ((SegmentMetadataQuery) query).getAnalysisTypes());
|
||||
|
||||
// test serialize and deserialize
|
||||
Assert.assertEquals(query, mapper.readValue(mapper.writeValueAsString(query), Query.class));
|
||||
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -290,14 +468,14 @@ public class SegmentMetadataQueryTest
|
|||
+ " \"queryType\":\"segmentMetadata\",\n"
|
||||
+ " \"dataSource\":\"test_ds\"\n"
|
||||
+ "}";
|
||||
Query query = mapper.readValue(queryStr, Query.class);
|
||||
Query query = MAPPER.readValue(queryStr, Query.class);
|
||||
Assert.assertTrue(query instanceof SegmentMetadataQuery);
|
||||
Assert.assertEquals("test_ds", Iterables.getOnlyElement(query.getDataSource().getNames()));
|
||||
Assert.assertEquals(new Interval(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT), query.getIntervals().get(0));
|
||||
Assert.assertTrue(((SegmentMetadataQuery) query).isUsingDefaultInterval());
|
||||
|
||||
// test serialize and deserialize
|
||||
Assert.assertEquals(query, mapper.readValue(mapper.writeValueAsString(query), Query.class));
|
||||
Assert.assertEquals(query, MAPPER.readValue(MAPPER.writeValueAsString(query), Query.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -83,8 +83,8 @@ public class SearchQueryRunnerWithCaseTest
|
|||
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\ta\u0001preferred\t94.874713"
|
||||
);
|
||||
|
||||
IncrementalIndex index1 = TestIndex.makeRealtimeIndex(input, true);
|
||||
IncrementalIndex index2 = TestIndex.makeRealtimeIndex(input, false);
|
||||
IncrementalIndex index1 = TestIndex.makeRealtimeIndex(input);
|
||||
IncrementalIndex index2 = TestIndex.makeRealtimeIndex(input);
|
||||
|
||||
QueryableIndex index3 = TestIndex.persistRealtimeAndLoadMMapped(index1);
|
||||
QueryableIndex index4 = TestIndex.persistRealtimeAndLoadMMapped(index2);
|
||||
|
|
|
@ -121,7 +121,7 @@ public class TopNQueryQueryToolChestTest
|
|||
);
|
||||
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
|
||||
factory,
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId)
|
||||
);
|
||||
|
||||
Map<String, Object> context = Maps.newHashMap();
|
||||
|
|
|
@ -106,7 +106,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
|
|||
TestCases.rtIndex,
|
||||
QueryRunnerTestHelper.makeQueryRunner(
|
||||
factory,
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), segmentId)
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), segmentId)
|
||||
)
|
||||
);
|
||||
testCaseMap.put(
|
||||
|
@ -123,13 +123,6 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
|
|||
new QueryableIndexSegment(segmentId, TestIndex.mergedRealtimeIndex())
|
||||
)
|
||||
);
|
||||
testCaseMap.put(
|
||||
TestCases.rtIndexOffheap,
|
||||
QueryRunnerTestHelper.makeQueryRunner(
|
||||
factory,
|
||||
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(true), segmentId)
|
||||
)
|
||||
);
|
||||
//Thread.sleep(10000);
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ public class TestIndex
|
|||
"placementish",
|
||||
"partial_null_column",
|
||||
"null_column",
|
||||
};
|
||||
};
|
||||
public static final String[] METRICS = new String[]{"index"};
|
||||
private static final Logger log = new Logger(TestIndex.class);
|
||||
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
|
||||
|
@ -93,7 +93,7 @@ public class TestIndex
|
|||
private static QueryableIndex mmappedIndex = null;
|
||||
private static QueryableIndex mergedRealtime = null;
|
||||
|
||||
public static IncrementalIndex getIncrementalTestIndex(boolean useOffheap)
|
||||
public static IncrementalIndex getIncrementalTestIndex()
|
||||
{
|
||||
synchronized (log) {
|
||||
if (realtimeIndex != null) {
|
||||
|
@ -101,7 +101,7 @@ public class TestIndex
|
|||
}
|
||||
}
|
||||
|
||||
return realtimeIndex = makeRealtimeIndex("druid.sample.tsv", useOffheap);
|
||||
return realtimeIndex = makeRealtimeIndex("druid.sample.tsv");
|
||||
}
|
||||
|
||||
public static QueryableIndex getMMappedTestIndex()
|
||||
|
@ -112,7 +112,7 @@ public class TestIndex
|
|||
}
|
||||
}
|
||||
|
||||
IncrementalIndex incrementalIndex = getIncrementalTestIndex(false);
|
||||
IncrementalIndex incrementalIndex = getIncrementalTestIndex();
|
||||
mmappedIndex = persistRealtimeAndLoadMMapped(incrementalIndex);
|
||||
|
||||
return mmappedIndex;
|
||||
|
@ -126,8 +126,8 @@ public class TestIndex
|
|||
}
|
||||
|
||||
try {
|
||||
IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top", false);
|
||||
IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom", false);
|
||||
IncrementalIndex top = makeRealtimeIndex("druid.sample.tsv.top");
|
||||
IncrementalIndex bottom = makeRealtimeIndex("druid.sample.tsv.bottom");
|
||||
|
||||
File tmpFile = File.createTempFile("yay", "who");
|
||||
tmpFile.delete();
|
||||
|
@ -163,7 +163,7 @@ public class TestIndex
|
|||
}
|
||||
}
|
||||
|
||||
private static IncrementalIndex makeRealtimeIndex(final String resourceFilename, final boolean useOffheap)
|
||||
private static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
|
||||
{
|
||||
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
|
||||
if (resource == null) {
|
||||
|
@ -171,10 +171,10 @@ public class TestIndex
|
|||
}
|
||||
log.info("Realtime loading index file[%s]", resource);
|
||||
CharSource stream = Resources.asByteSource(resource).asCharSource(Charsets.UTF_8);
|
||||
return makeRealtimeIndex(stream, useOffheap);
|
||||
return makeRealtimeIndex(stream);
|
||||
}
|
||||
|
||||
public static IncrementalIndex makeRealtimeIndex(final CharSource source, final boolean useOffheap)
|
||||
public static IncrementalIndex makeRealtimeIndex(final CharSource source)
|
||||
{
|
||||
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
|
||||
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
|
||||
|
|
Loading…
Reference in New Issue