Support min/max values for metadata query (#2208)

This commit is contained in:
navis.ryu 2016-01-06 13:32:38 +09:00
parent 47d48e1e67
commit dd2375477a
15 changed files with 463 additions and 85 deletions

View File

@ -25,7 +25,7 @@ To determine which nodes to forward queries to, the Broker node first builds a v
Caching
-------
Broker nodes employ a cache with a LRU cache invalidation strategy. The broker cache stores per segment results. The cache can be local to each broker node or shared across multiple nodes using an external distributed cache such as [memcached](http://memcached.org/). Each time a broker node receives a query, it first maps the query to a set of segments. A subset of these segment results may already exist in the cache and the results can be directly pulled from the cache. For any segment results that do not exist in the cache, the broker node will forward the query to the
Broker nodes employ a cache with a LRU cache invalidation strategy. The broker cache stores per-segment results. The cache can be local to each broker node or shared across multiple nodes using an external distributed cache such as [memcached](http://memcached.org/). Each time a broker node receives a query, it first maps the query to a set of segments. A subset of these segment results may already exist in the cache and the results can be directly pulled from the cache. For any segment results that do not exist in the cache, the broker node will forward the query to the
historical nodes. Once the historical nodes return their results, the broker will store those results in the cache. Real-time segments are never cached and hence requests for real-time data will always be forwarded to real-time nodes. Real-time data is perpetually changing and caching the results would be unreliable.
HTTP Endpoints

View File

@ -90,7 +90,7 @@ Druid is a column store, which means each individual column is stored separately
in that query, and Druid is pretty good about only scanning exactly what it needs for a query.
Different columns can also employ different compression methods. Different columns can also have different indexes associated with them.
Druid indexes data on a per shard (segment) level.
Druid indexes data on a per-shard (segment) level.
## Loading the Data

View File

@ -2,9 +2,10 @@
layout: doc_page
---
# Segment Metadata Queries
Segment metadata queries return per segment information about:
Segment metadata queries return per-segment information about:
* Cardinality of all columns in the segment
* Min/max values of string type columns in the segment
* Estimated byte size for the segment columns if they were stored in a flat format
* Number of rows stored inside the segment
* Interval the segment covers
@ -103,13 +104,17 @@ This is a list of properties that determines the amount of information returned
By default, all analysis types will be used. If a property is not needed, omitting it from this list will result in a more efficient query.
There are four types of column analyses:
There are five types of column analyses:
#### cardinality
* `cardinality` in the result will return the estimated floor of cardinality for each column. Only relevant for
dimension columns.
#### minmax
* Estimated min/max values for each column. Only relevant for dimension columns.
#### size
* `size` in the result will contain the estimated total segment byte size as if the data were stored in text format

View File

@ -21,14 +21,21 @@ package io.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularity;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
@ -38,8 +45,10 @@ import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.EnumSet;
@ -104,7 +113,11 @@ public class SegmentAnalyzer
analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT);
break;
case STRING:
analysis = analyzeStringColumn(capabilities, column, storageAdapter.getDimensionCardinality(columnName));
if (index != null) {
analysis = analyzeStringColumn(capabilities, column);
} else {
analysis = analyzeStringColumn(capabilities, storageAdapter, columnName);
}
break;
case COMPLEX:
analysis = analyzeComplexColumn(capabilities, column, storageAdapter.getColumnTypeName(columnName));
@ -140,6 +153,11 @@ public class SegmentAnalyzer
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.CARDINALITY);
}
public boolean analyzingMinMax()
{
return analysisTypes.contains(SegmentMetadataQuery.AnalysisType.MINMAX);
}
private ColumnAnalysis analyzeNumericColumn(
final ColumnCapabilities capabilities,
final int length,
@ -161,28 +179,30 @@ public class SegmentAnalyzer
capabilities.hasMultipleValues(),
size,
null,
null,
null,
null
);
}
private ColumnAnalysis analyzeStringColumn(
final ColumnCapabilities capabilities,
@Nullable final Column column,
final int cardinality
final Column column
)
{
long size = 0;
if (column != null && analyzingSize()) {
Comparable min = null;
Comparable max = null;
if (!capabilities.hasBitmapIndexes()) {
return ColumnAnalysis.error("string_no_bitmap");
}
final BitmapIndex bitmapIndex = column.getBitmapIndex();
if (cardinality != bitmapIndex.getCardinality()) {
return ColumnAnalysis.error("bitmap_wrong_cardinality");
}
final int cardinality = bitmapIndex.getCardinality();
if (analyzingSize()) {
for (int i = 0; i < cardinality; ++i) {
String value = bitmapIndex.getValue(i);
if (value != null) {
@ -191,11 +211,91 @@ public class SegmentAnalyzer
}
}
if (analyzingMinMax() && cardinality > 0) {
min = Strings.nullToEmpty(bitmapIndex.getValue(0));
max = Strings.nullToEmpty(bitmapIndex.getValue(cardinality - 1));
}
return new ColumnAnalysis(
capabilities.getType().name(),
capabilities.hasMultipleValues(),
size,
analyzingCardinality() ? cardinality : 0,
min,
max,
null
);
}
private ColumnAnalysis analyzeStringColumn(
final ColumnCapabilities capabilities,
final StorageAdapter storageAdapter,
final String columnName
)
{
int cardinality = 0;
long size = 0;
Comparable min = null;
Comparable max = null;
if (analyzingCardinality()) {
cardinality = storageAdapter.getDimensionCardinality(columnName);
}
if (analyzingSize()) {
final long start = storageAdapter.getMinTime().getMillis();
final long end = storageAdapter.getMaxTime().getMillis();
final Sequence<Cursor> cursors =
storageAdapter.makeCursors(null, new Interval(start, end), QueryGranularity.ALL, false);
size = cursors.accumulate(
0L,
new Accumulator<Long, Cursor>()
{
@Override
public Long accumulate(Long accumulated, Cursor cursor)
{
DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(
columnName,
columnName
)
);
if (selector == null) {
return accumulated;
}
long current = accumulated;
while (!cursor.isDone()) {
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (dimVal != null && !dimVal.isEmpty()) {
current += StringUtils.toUtf8(dimVal).length;
}
}
cursor.advance();
}
return current;
}
}
);
}
if (analyzingMinMax()) {
min = storageAdapter.getMinValue(columnName);
max = storageAdapter.getMaxValue(columnName);
}
return new ColumnAnalysis(
capabilities.getType().name(),
capabilities.hasMultipleValues(),
size,
cardinality,
min,
max,
null
);
}
@ -218,7 +318,7 @@ public class SegmentAnalyzer
final Function<Object, Long> inputSizeFn = serde.inputSizeFn();
if (inputSizeFn == null) {
return new ColumnAnalysis(typeName, hasMultipleValues, 0, null, null);
return new ColumnAnalysis(typeName, hasMultipleValues, 0, null, null, null, null);
}
final int length = column.getLength();
@ -232,6 +332,8 @@ public class SegmentAnalyzer
hasMultipleValues,
size,
null,
null,
null,
null
);
}

View File

@ -21,6 +21,7 @@ package io.druid.query.metadata.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Objects;
@ -32,13 +33,15 @@ public class ColumnAnalysis
public static ColumnAnalysis error(String reason)
{
return new ColumnAnalysis("STRING", false, -1, null, ERROR_PREFIX + reason);
return new ColumnAnalysis("STRING", false, -1, null, null, null, ERROR_PREFIX + reason);
}
private final String type;
private final boolean hasMultipleValues;
private final long size;
private final Integer cardinality;
private final Comparable minValue;
private final Comparable maxValue;
private final String errorMessage;
@JsonCreator
@ -47,6 +50,8 @@ public class ColumnAnalysis
@JsonProperty("hasMultipleValues") boolean hasMultipleValues,
@JsonProperty("size") long size,
@JsonProperty("cardinality") Integer cardinality,
@JsonProperty("minValue") Comparable minValue,
@JsonProperty("maxValue") Comparable maxValue,
@JsonProperty("errorMessage") String errorMessage
)
{
@ -54,6 +59,8 @@ public class ColumnAnalysis
this.hasMultipleValues = hasMultipleValues;
this.size = size;
this.cardinality = cardinality;
this.minValue = minValue;
this.maxValue = maxValue;
this.errorMessage = errorMessage;
}
@ -81,6 +88,20 @@ public class ColumnAnalysis
return cardinality;
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonProperty
public Comparable getMinValue()
{
return minValue;
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonProperty
public Comparable getMaxValue()
{
return maxValue;
}
@JsonProperty
public String getErrorMessage()
{
@ -113,21 +134,29 @@ public class ColumnAnalysis
Integer cardinality = getCardinality();
final Integer rhsCardinality = rhs.getCardinality();
if (cardinality == null) {
cardinality = rhsCardinality;
} else {
if (rhsCardinality != null) {
} else if (rhsCardinality != null) {
cardinality = Math.max(cardinality, rhsCardinality);
}
final boolean multipleValues = hasMultipleValues || rhs.isHasMultipleValues();
Comparable newMin = choose(minValue, rhs.minValue, false);
Comparable newMax = choose(maxValue, rhs.maxValue, true);
return new ColumnAnalysis(type, multipleValues, size + rhs.getSize(), cardinality, newMin, newMax, null);
}
return new ColumnAnalysis(
type,
hasMultipleValues || rhs.isHasMultipleValues(),
size + rhs.getSize(),
cardinality,
null
);
private <T extends Comparable> T choose(T obj1, T obj2, boolean max)
{
if (obj1 == null) {
return max ? obj2 : null;
}
if (obj2 == null) {
return max ? obj1 : null;
}
int compare = max ? obj1.compareTo(obj2) : obj2.compareTo(obj1);
return compare > 0 ? obj1 : obj2;
}
@Override
@ -138,6 +167,8 @@ public class ColumnAnalysis
", hasMultipleValues=" + hasMultipleValues +
", size=" + size +
", cardinality=" + cardinality +
", minValue=" + minValue +
", maxValue=" + maxValue +
", errorMessage='" + errorMessage + '\'' +
'}';
}
@ -156,12 +187,14 @@ public class ColumnAnalysis
size == that.size &&
Objects.equals(type, that.type) &&
Objects.equals(cardinality, that.cardinality) &&
Objects.equals(minValue, that.minValue) &&
Objects.equals(maxValue, that.maxValue) &&
Objects.equals(errorMessage, that.errorMessage);
}
@Override
public int hashCode()
{
return Objects.hash(type, hasMultipleValues, size, cardinality, errorMessage);
return Objects.hash(type, hasMultipleValues, size, cardinality, minValue, maxValue, errorMessage);
}
}

View File

@ -53,7 +53,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
CARDINALITY,
SIZE,
INTERVAL,
AGGREGATORS;
AGGREGATORS,
MINMAX;
@JsonValue
@Override
@ -81,7 +82,8 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
public static final EnumSet<AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
AnalysisType.CARDINALITY,
AnalysisType.SIZE,
AnalysisType.INTERVAL
AnalysisType.INTERVAL,
AnalysisType.MINMAX
);
private final ColumnIncluderator toInclude;
@ -177,6 +179,11 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
return analysisTypes.contains(AnalysisType.AGGREGATORS);
}
public boolean hasMinMax()
{
return analysisTypes.contains(AnalysisType.MINMAX);
}
public byte[] getAnalysisTypesCacheKey()
{
int size = 1;
@ -242,6 +249,20 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
);
}
public Query<SegmentAnalysis> withColumns(ColumnIncluderator includerator)
{
return new SegmentMetadataQuery(
getDataSource(),
getQuerySegmentSpec(),
includerator,
merge,
getContext(),
analysisTypes,
usingDefaultInterval,
lenientAggregatorMerge
);
}
@Override
public String toString()
{

View File

@ -35,6 +35,7 @@ import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
@ -140,6 +141,28 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
@Override
public Comparable getMinValue(String dimension)
{
Column column = index.getColumn(dimension);
if (column != null && column.getCapabilities().hasBitmapIndexes()) {
BitmapIndex bitmap = column.getBitmapIndex();
return bitmap.getCardinality() > 0 ? bitmap.getValue(0) : null;
}
return null;
}
@Override
public Comparable getMaxValue(String dimension)
{
Column column = index.getColumn(dimension);
if (column != null && column.getCapabilities().hasBitmapIndexes()) {
BitmapIndex bitmap = column.getBitmapIndex();
return bitmap.getCardinality() > 0 ? bitmap.getValue(bitmap.getCardinality() - 1) : null;
}
return null;
}
@Override
public Capabilities getCapabilities()
{

View File

@ -44,6 +44,8 @@ public interface StorageAdapter extends CursorFactory
public int getDimensionCardinality(String column);
public DateTime getMinTime();
public DateTime getMaxTime();
public Comparable getMinValue(String column);
public Comparable getMaxValue(String column);
public Capabilities getCapabilities();
public ColumnCapabilities getColumnCapabilities(String column);

View File

@ -51,6 +51,7 @@ import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
@ -847,6 +848,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public int size();
public String getMinValue();
public String getMaxValue();
public int add(String value);
public SortedDimLookup sort();
@ -899,6 +904,18 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return delegate.size();
}
@Override
public String getMinValue()
{
return Strings.nullToEmpty(delegate.getMinValue());
}
@Override
public String getMaxValue()
{
return Strings.nullToEmpty(delegate.getMaxValue());
}
@Override
public int add(String value)
{

View File

@ -136,6 +136,20 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return index.getMaxTime();
}
@Override
public Comparable getMinValue(String column)
{
IncrementalIndex.DimDim dimDim = index.getDimensionValues(column);
return dimDim == null ? null : dimDim.getMinValue();
}
@Override
public Comparable getMaxValue(String column)
{
IncrementalIndex.DimDim dimDim = index.getDimensionValues(column);
return dimDim == null ? null : dimDim.getMaxValue();
}
@Override
public Capabilities getCapabilities()
{

View File

@ -278,6 +278,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
static class OnHeapDimDim implements DimDim
{
private final Map<String, Integer> valueToId = Maps.newHashMap();
private String minValue = null;
private String maxValue = null;
private final List<String> idToValue = Lists.newArrayList();
private final Object lock;
@ -326,10 +328,24 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
final int index = size();
valueToId.put(value, index);
idToValue.add(value);
minValue = minValue == null || minValue.compareTo(value) > 0 ? value : minValue;
maxValue = maxValue == null || maxValue.compareTo(value) < 0 ? value : maxValue;
return index;
}
}
@Override
public String getMinValue()
{
return minValue;
}
@Override
public String getMaxValue()
{
return maxValue;
}
public OnHeapDimLookup sort()
{
synchronized (lock) {

View File

@ -63,7 +63,7 @@ public class SegmentMetadataQueryQueryToolChestTest
new SegmentMetadataQueryQueryToolChest(null).getCacheStrategy(query);
// Test cache key generation
byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01, 0x02};
byte[] expectedKey = {0x04, 0x01, (byte) 0xFF, 0x00, 0x01, 0x02, 0x04};
byte[] actualKey = strategy.computeCacheKey(query);
Assert.assertArrayEquals(expectedKey, actualKey);
@ -79,6 +79,8 @@ public class SegmentMetadataQueryQueryToolChestTest
true,
10881,
1,
"preferred",
"preferred",
null
)
), 71982,

View File

@ -153,14 +153,18 @@ public class SegmentMetadataQueryTest
false,
12090,
null,
null,
null,
null
),
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
mmap1 ? 10881 : 0,
mmap1 ? 10881 : 10764,
1,
"preferred",
"preferred",
null
),
"index",
@ -169,9 +173,11 @@ public class SegmentMetadataQueryTest
false,
9672,
null,
null,
null,
null
)
), mmap1 ? 71982 : 32643,
), mmap1 ? 71982 : 72755,
1209,
null
);
@ -187,6 +193,8 @@ public class SegmentMetadataQueryTest
false,
12090,
null,
null,
null,
null
),
"placement",
@ -195,6 +203,8 @@ public class SegmentMetadataQueryTest
false,
mmap2 ? 10881 : 0,
1,
null,
null,
null
),
"index",
@ -203,9 +213,12 @@ public class SegmentMetadataQueryTest
false,
9672,
null,
null,
null,
null
)
), mmap2 ? 71982 : 32643,
// null_column will be included only for incremental index, which makes a little bigger result than expected
), mmap2 ? 71982 : 72755,
1209,
null
);
@ -236,6 +249,8 @@ public class SegmentMetadataQueryTest
false,
0,
1,
null,
null,
null
),
"placementish",
@ -244,6 +259,8 @@ public class SegmentMetadataQueryTest
true,
0,
9,
null,
null,
null
)
),
@ -298,6 +315,8 @@ public class SegmentMetadataQueryTest
false,
0,
1,
null,
null,
null
),
"quality_uniques",
@ -306,6 +325,8 @@ public class SegmentMetadataQueryTest
false,
0,
null,
null,
null,
null
)
),
@ -349,6 +370,53 @@ public class SegmentMetadataQueryTest
@Test
public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
{
ColumnAnalysis analysis = new ColumnAnalysis(
ValueType.STRING.toString(),
false,
(mmap1 ? 10881 : 10764) + (mmap2 ? 10881 : 10764),
1,
"preferred",
"preferred",
null
);
testSegmentMetadataQueryWithDefaultAnalysisMerge("placement", analysis);
}
@Test
public void testSegmentMetadataQueryWithDefaultAnalysisMerge2()
{
ColumnAnalysis analysis = new ColumnAnalysis(
ValueType.STRING.toString(),
false,
(mmap1 ? 6882 : 6808) + (mmap2 ? 6882 : 6808),
3,
"spot",
"upfront",
null
);
testSegmentMetadataQueryWithDefaultAnalysisMerge("market", analysis);
}
@Test
public void testSegmentMetadataQueryWithDefaultAnalysisMerge3()
{
ColumnAnalysis analysis = new ColumnAnalysis(
ValueType.STRING.toString(),
false,
(mmap1 ? 9765 : 9660) + (mmap2 ? 9765 : 9660),
9,
"automotive",
"travel",
null
);
testSegmentMetadataQueryWithDefaultAnalysisMerge("quality", analysis);
}
private void testSegmentMetadataQueryWithDefaultAnalysisMerge(
String column,
ColumnAnalysis analysis
)
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
differentIds ? "merged" : "testSegment",
@ -360,14 +428,8 @@ public class SegmentMetadataQueryTest
false,
12090 * 2,
null,
null
),
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
10881 * ((mmap1 ? 1 : 0) + (mmap2 ? 1 : 0)),
1,
null,
null,
null
),
"index",
@ -376,8 +438,12 @@ public class SegmentMetadataQueryTest
false,
9672 * 2,
null,
null,
null,
null
)
),
column,
analysis
),
expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(),
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
@ -400,12 +466,11 @@ public class SegmentMetadataQueryTest
toolChest
);
Query query = testQuery.withColumns(new ListColumnIncluderator(Arrays.asList("__time", "index", column)));
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
testQuery,
Maps.newHashMap()
),
myRunner.run(query, Maps.newHashMap()),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
@ -424,6 +489,8 @@ public class SegmentMetadataQueryTest
false,
0,
0,
null,
null,
null
)
),
@ -482,6 +549,8 @@ public class SegmentMetadataQueryTest
false,
0,
0,
null,
null,
null
)
),

View File

@ -19,75 +19,149 @@
package io.druid.query.metadata.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
public class ColumnAnalysisTest
{
@Test
public void testFoldStringColumns()
private final ObjectMapper MAPPER = TestHelper.getObjectMapper();
private void assertSerDe(ColumnAnalysis analysis) throws Exception
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", true, 3L, 4, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", true, 4L, 4, null);
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
Assert.assertEquals(analysis, MAPPER.readValue(MAPPER.writeValueAsString(analysis), ColumnAnalysis.class));
}
@Test
public void testFoldWithNull()
public void testFoldStringColumns() throws Exception
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null);
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, "aaA", "Zzz", null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", true, 3L, 4, "aAA", "ZZz", null);
assertSerDe(analysis1);
assertSerDe(analysis2);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", true, 4L, 4, "aAA", "Zzz", null);
ColumnAnalysis fold1 = analysis1.fold(analysis2);
ColumnAnalysis fold2 = analysis2.fold(analysis1);
Assert.assertEquals(expected, fold1);
Assert.assertEquals(expected, fold2);
assertSerDe(fold1);
assertSerDe(fold2);
}
@Test
public void testFoldWithNull() throws Exception
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("STRING", false, 1L, 2, null, null, null);
Assert.assertEquals(analysis1, analysis1.fold(null));
assertSerDe(analysis1);
}
@Test
public void testFoldComplexColumns()
public void testFoldComplexColumns() throws Exception
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
final ColumnAnalysis expected = new ColumnAnalysis("hyperUnique", false, 0L, null, null);
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 0L, null, null, null, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("hyperUnique", false, 0L, null, null, null, null);
assertSerDe(analysis1);
assertSerDe(analysis2);
final ColumnAnalysis expected = new ColumnAnalysis("hyperUnique", false, 0L, null, null, null, null);
ColumnAnalysis fold1 = analysis1.fold(analysis2);
ColumnAnalysis fold2 = analysis2.fold(analysis1);
Assert.assertEquals(expected, fold1);
Assert.assertEquals(expected, fold2);
assertSerDe(fold1);
assertSerDe(fold2);
}
@Test
public void testFoldDifferentTypes()
public void testFoldDifferentTypes() throws Exception
{
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 1L, 1, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("COMPLEX", false, 2L, 2, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:cannot_merge_diff_types");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
final ColumnAnalysis analysis1 = new ColumnAnalysis("hyperUnique", false, 1L, 1, null, null, null);
final ColumnAnalysis analysis2 = new ColumnAnalysis("COMPLEX", false, 2L, 2, null, null, null);
assertSerDe(analysis1);
assertSerDe(analysis2);
final ColumnAnalysis expected = new ColumnAnalysis(
"STRING",
false,
-1L,
null,
null,
null,
"error:cannot_merge_diff_types"
);
ColumnAnalysis fold1 = analysis1.fold(analysis2);
ColumnAnalysis fold2 = analysis2.fold(analysis1);
Assert.assertEquals(expected, fold1);
Assert.assertEquals(expected, fold2);
assertSerDe(fold1);
assertSerDe(fold2);
}
@Test
public void testFoldSameErrors()
public void testFoldSameErrors() throws Exception
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = ColumnAnalysis.error("foo");
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:foo");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
assertSerDe(analysis1);
assertSerDe(analysis2);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, null, null, "error:foo");
ColumnAnalysis fold1 = analysis1.fold(analysis2);
ColumnAnalysis fold2 = analysis2.fold(analysis1);
Assert.assertEquals(expected, fold1);
Assert.assertEquals(expected, fold2);
assertSerDe(fold1);
assertSerDe(fold2);
}
@Test
public void testFoldErrorAndNoError()
public void testFoldErrorAndNoError() throws Exception
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", false, 2L, 2, null);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:foo");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
final ColumnAnalysis analysis2 = new ColumnAnalysis("STRING", false, 2L, 2, "a", "z", null);
assertSerDe(analysis1);
assertSerDe(analysis2);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, null, null, "error:foo");
ColumnAnalysis fold1 = analysis1.fold(analysis2);
ColumnAnalysis fold2 = analysis2.fold(analysis1);
Assert.assertEquals(expected, fold1);
Assert.assertEquals(expected, fold2);
assertSerDe(fold1);
assertSerDe(fold2);
}
@Test
public void testFoldDifferentErrors()
public void testFoldDifferentErrors() throws Exception
{
final ColumnAnalysis analysis1 = ColumnAnalysis.error("foo");
final ColumnAnalysis analysis2 = ColumnAnalysis.error("bar");
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, "error:multiple_errors");
Assert.assertEquals(expected, analysis1.fold(analysis2));
Assert.assertEquals(expected, analysis2.fold(analysis1));
assertSerDe(analysis1);
assertSerDe(analysis2);
final ColumnAnalysis expected = new ColumnAnalysis("STRING", false, -1L, null, null, null, "error:multiple_errors");
ColumnAnalysis fold1 = analysis1.fold(analysis2);
ColumnAnalysis fold2 = analysis2.fold(analysis1);
Assert.assertEquals(expected, fold1);
Assert.assertEquals(expected, fold2);
assertSerDe(fold1);
assertSerDe(fold2);
}
}

View File

@ -419,7 +419,7 @@ first maps the query to a set of segments. Results for certain segments may
already exist in the cache and there is no need to recompute them. For any
results that do not exist in the cache, the broker node will forward the query
to the correct historical and real-time nodes. Once historical nodes return
their results, the broker will cache these results on a per segment basis for
their results, the broker will cache these results on a per-segment basis for
future use. This process is illustrated in Figure~\ref{fig:caching}. Real-time
data is never cached and hence requests for real-time data will always be
forwarded to real-time nodes. Real-time data is perpetually changing and
@ -428,7 +428,7 @@ caching the results is unreliable.
\begin{figure*}
\centering
\includegraphics[width = 4.5in]{caching}
\caption{Results are cached per segment. Queries combine cached results with results computed on historical and real-time nodes.}
\caption{Results are cached per-segment. Queries combine cached results with results computed on historical and real-time nodes.}
\label{fig:caching}
\end{figure*}