Ability to directly query row-based datasources. (#9502)

* Ability to directly query row-based datasources.

Includes:

- Foundational classes RowBasedSegment, RowBasedStorageAdapter,
  RowBasedCursor provide a queryable interface on top of a
  RowBasedColumnSelectorFactory.
- Add LookupSegment: A RowBasedSegment that is built on lookup data.
- Improve capability reporting in RowBasedColumnSelectorFactory.

* Fix import.

* Remove unthrown IOException.
This commit is contained in:
Gian Merlino 2020-03-10 20:39:01 -07:00 committed by GitHub
parent c74749f0f4
commit 4f085896c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1724 additions and 40 deletions

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.lookup;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.lookup.LookupColumnSelectorFactory;
import org.apache.druid.timeline.SegmentId;
import java.util.Map;
import java.util.function.Function;
import java.util.function.ToLongFunction;
/**
* A {@link org.apache.druid.segment.Segment} that is based on a {@link LookupExtractor}. Allows direct
* querying of lookups. The lookup must support {@link LookupExtractor#iterable()}.
*/
public class LookupSegment extends RowBasedSegment<Map.Entry<String, String>>
{
private static final Map<String, ValueType> ROW_SIGNATURE = ImmutableMap.of(
LookupColumnSelectorFactory.KEY_COLUMN, ValueType.STRING,
LookupColumnSelectorFactory.VALUE_COLUMN, ValueType.STRING
);
public LookupSegment(final String lookupName, final LookupExtractorFactory lookupExtractorFactory)
{
super(
SegmentId.dummy(lookupName),
() -> {
final LookupExtractor extractor = lookupExtractorFactory.get();
if (!extractor.canIterate()) {
throw new ISE("Cannot iterate lookup[%s]", lookupExtractorFactory);
}
return extractor.iterable().iterator();
},
new RowAdapter<Map.Entry<String, String>>()
{
@Override
public ToLongFunction<Map.Entry<String, String>> timestampFunction()
{
// No timestamps for lookups.
return row -> 0L;
}
@Override
public Function<Map.Entry<String, String>, Object> columnFunction(String columnName)
{
if (LookupColumnSelectorFactory.KEY_COLUMN.equals(columnName)) {
return Map.Entry::getKey;
} else if (LookupColumnSelectorFactory.VALUE_COLUMN.equals(columnName)) {
return Map.Entry::getValue;
} else {
return row -> null;
}
}
},
ROW_SIGNATURE
);
}
}

View File

@ -87,20 +87,31 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
}
@Nullable
public static ColumnCapabilities getColumnCapabilities(
static ColumnCapabilities getColumnCapabilities(
final Map<String, ValueType> rowSignature,
final String columnName
)
{
if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
// TIME_COLUMN_NAME is handled specially; override the provided rowSignature.
return new ColumnCapabilitiesImpl().setType(ValueType.LONG);
return new ColumnCapabilitiesImpl().setType(ValueType.LONG).setIsComplete(true);
} else {
final ValueType valueType = rowSignature.get(columnName);
// Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things.
// Do set hasMultipleValues, because we might return multiple values.
return valueType != null ? new ColumnCapabilitiesImpl().setType(valueType).setHasMultipleValues(true) : null;
if (valueType != null) {
return new ColumnCapabilitiesImpl()
.setType(valueType)
// Non-numeric types might have multiple values
.setHasMultipleValues(!valueType.isNumeric())
// Numeric types should be reported as complete, but not STRING or COMPLEX (because we don't have full info)
.setIsComplete(valueType.isNumeric());
} else {
return null;
}
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.BooleanValueMatcher;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.function.ToLongFunction;
/**
* A {@link Cursor} that is based on a stream of objects. Generally created by a {@link RowBasedStorageAdapter}.
*
* @see RowBasedSegment#RowBasedSegment for implementation notes
*/
public class RowBasedCursor<RowType> implements Cursor
{
private final RowWalker<RowType> rowWalker;
private final ToLongFunction<RowType> timestampFunction;
private final Interval interval;
private final boolean descending;
private final DateTime cursorTime;
private final ColumnSelectorFactory columnSelectorFactory;
private final ValueMatcher valueMatcher;
RowBasedCursor(
final RowWalker<RowType> rowWalker,
final RowAdapter<RowType> rowAdapter,
@Nullable final Filter filter,
final Interval interval,
final VirtualColumns virtualColumns,
final Granularity gran,
final boolean descending,
final Map<String, ValueType> rowSignature
)
{
this.rowWalker = rowWalker;
this.timestampFunction = rowAdapter.timestampFunction();
this.interval = interval;
this.descending = descending;
this.cursorTime = gran.toDateTime(interval.getStartMillis());
this.columnSelectorFactory = virtualColumns.wrap(
RowBasedColumnSelectorFactory.create(
rowAdapter,
rowWalker::currentRow,
rowSignature,
false
)
);
if (filter == null) {
this.valueMatcher = BooleanValueMatcher.of(true);
} else {
this.valueMatcher = filter.makeMatcher(this.columnSelectorFactory);
}
rowWalker.skipToDateTime(descending ? interval.getEnd().minus(1) : interval.getStart(), descending);
advanceToMatchingRow();
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
return columnSelectorFactory;
}
@Override
public DateTime getTime()
{
return cursorTime;
}
@Override
public void advance()
{
advanceUninterruptibly();
BaseQuery.checkInterrupted();
}
@Override
public void advanceUninterruptibly()
{
rowWalker.advance();
advanceToMatchingRow();
}
@Override
public boolean isDone()
{
return rowWalker.isDone() || !interval.contains(timestampFunction.applyAsLong(rowWalker.currentRow()));
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
rowWalker.reset();
rowWalker.skipToDateTime(descending ? interval.getEnd().minus(1) : interval.getStart(), descending);
advanceToMatchingRow();
}
private void advanceToMatchingRow()
{
while (!isDone() && !valueMatcher.matches()) {
rowWalker.advance();
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
/**
* A {@link Segment} that is based on a stream of objects.
*/
public class RowBasedSegment<RowType> extends AbstractSegment
{
private final SegmentId segmentId;
private final StorageAdapter storageAdapter;
/**
* Create a row-based segment.
*
* The provided "rowIterable" must be in time-order according to the provided {@link RowAdapter#timestampFunction()}.
* The cursor returned by {@link RowBasedStorageAdapter#makeCursors} makes no attempt to verify this, and callers
* will expect it.
*
* The provided "rowSignature" will be used for reporting available columns and their capabilities to users of
* {@link #asStorageAdapter()}. Note that the {@link ColumnSelectorFactory} implementation returned by this segment's
* storage adapter will allow creation of selectors on any field, using the {@link RowAdapter#columnFunction} for that
* field, even if it doesn't appear in "rowSignature".
*
* @param segmentId segment identifier; will be returned by {@link #getId()}
* @param rowIterable objects that comprise this segment
* @param rowAdapter adapter used for reading these objects
* @param rowSignature signature of the columns in these objects
*/
public RowBasedSegment(
final SegmentId segmentId,
final Iterable<RowType> rowIterable,
final RowAdapter<RowType> rowAdapter,
final Map<String, ValueType> rowSignature
)
{
this.segmentId = Preconditions.checkNotNull(segmentId, "segmentId");
this.storageAdapter = new RowBasedStorageAdapter<>(
rowIterable,
rowAdapter,
rowSignature
);
}
@Override
@Nonnull
public SegmentId getId()
{
return segmentId;
}
@Override
@Nonnull
public Interval getDataInterval()
{
return storageAdapter.getInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
return null;
}
@Override
@Nonnull
public StorageAdapter asStorageAdapter()
{
return storageAdapter;
}
@Override
public void close()
{
// Do nothing.
}
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A {@link StorageAdapter} that is based on a stream of objects. Generally created by a {@link RowBasedSegment}.
*
* @see RowBasedSegment#RowBasedSegment for implementation notes
*/
public class RowBasedStorageAdapter<RowType> implements StorageAdapter
{
private final Iterable<RowType> rowIterable;
private final RowAdapter<RowType> rowAdapter;
private final Map<String, ValueType> rowSignature;
RowBasedStorageAdapter(
final Iterable<RowType> rowIterable,
final RowAdapter<RowType> rowAdapter,
final Map<String, ValueType> rowSignature
)
{
this.rowIterable = Preconditions.checkNotNull(rowIterable, "rowIterable");
this.rowAdapter = Preconditions.checkNotNull(rowAdapter, "rowAdapter");
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
}
@Override
public Interval getInterval()
{
return Intervals.ETERNITY;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return new ListIndexed<>(new ArrayList<>(rowSignature.keySet()));
}
@Override
public Iterable<String> getAvailableMetrics()
{
return Collections.emptyList();
}
@Override
public int getDimensionCardinality(String column)
{
return Integer.MAX_VALUE;
}
@Override
public DateTime getMinTime()
{
return getInterval().getStart();
}
@Override
public DateTime getMaxTime()
{
return getInterval().getEnd().minus(1);
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
return null;
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
return null;
}
@Override
public Capabilities getCapabilities()
{
return Capabilities.builder().dimensionValuesSorted(false).build();
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return RowBasedColumnSelectorFactory.getColumnCapabilities(rowSignature, column);
}
@Nullable
@Override
public String getColumnTypeName(String column)
{
final ColumnCapabilities columnCapabilities = getColumnCapabilities(column);
return columnCapabilities != null ? columnCapabilities.getType().toString() : null;
}
@Override
public int getNumRows()
{
throw new UnsupportedOperationException("Cannot retrieve number of rows");
}
@Override
public DateTime getMaxIngestedEventTime()
{
return getMaxTime();
}
@Override
public Metadata getMetadata()
{
throw new UnsupportedOperationException("Cannot retrieve metadata");
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable final Filter filter,
final Interval queryInterval,
final VirtualColumns virtualColumns,
final Granularity gran,
final boolean descending,
@Nullable final QueryMetrics<?> queryMetrics
)
{
final Interval actualInterval = queryInterval.overlap(new Interval(getMinTime(), gran.bucketEnd(getMaxTime())));
if (actualInterval == null) {
return Sequences.empty();
}
final RowWalker<RowType> rowWalker = new RowWalker<>(
descending ? reverse(rowIterable) : rowIterable,
rowAdapter
);
final Iterable<Interval> bucketIntervals = gran.getIterable(actualInterval);
return Sequences.simple(
Iterables.transform(
descending ? reverse(bucketIntervals) : bucketIntervals,
bucketInterval ->
new RowBasedCursor<>(
rowWalker,
rowAdapter,
filter,
bucketInterval,
virtualColumns,
gran,
descending,
rowSignature
)
)
);
}
/**
* Reverse an Iterable. Will avoid materialization if possible, but, this is not always possible.
*/
private static <T> Iterable<T> reverse(final Iterable<T> iterable)
{
if (iterable instanceof List) {
//noinspection unchecked, rawtypes
return Lists.reverse((List) iterable);
} else {
// Materialize and reverse the objects. Note that this means reversing non-List Iterables will use extra memory.
return Lists.reverse(Lists.newArrayList(iterable));
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Preconditions;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.function.ToLongFunction;
/**
* Used by {@link RowBasedStorageAdapter} and {@link RowBasedCursor} to walk through rows. It allows multiple
* {@link RowBasedCursor} to share the same underlying Iterable.
*/
public class RowWalker<T>
{
private final Iterable<T> rowIterable;
private final ToLongFunction<T> timestampFunction;
private Iterator<T> rowIterator;
@Nullable
private T current = null;
RowWalker(final Iterable<T> rowIterable, final RowAdapter<T> rowAdapter)
{
this.rowIterable = rowIterable;
this.timestampFunction = rowAdapter.timestampFunction();
reset();
}
public boolean isDone()
{
return current == null;
}
public T currentRow()
{
return Preconditions.checkNotNull(current, "cannot call currentRow when isDone == true");
}
public void advance()
{
if (rowIterator.hasNext()) {
current = rowIterator.next();
if (current == null) {
throw new NullPointerException("null row encountered in walker");
}
} else {
current = null;
}
}
public void reset()
{
rowIterator = rowIterable.iterator();
advance();
}
public void skipToDateTime(final DateTime timestamp, final boolean descending)
{
while (current != null && (descending
? timestamp.isBefore(timestampFunction.applyAsLong(current))
: timestamp.isAfter(timestampFunction.applyAsLong(current)))) {
advance();
}
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.lookup;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.RowBasedStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LookupSegmentTest
{
public static final String LOOKUP_NAME = "mylookup";
public static final Map<String, String> LOOKUP_MAP =
ImmutableSortedMap.of(
"a", "b",
"x", "y"
);
public static final LookupExtractorFactory LOOKUP_EXTRACTOR_FACTORY =
new LookupExtractorFactory()
{
@Override
public boolean start()
{
return true;
}
@Override
public boolean close()
{
return true;
}
@Override
public boolean replaces(@Nullable LookupExtractorFactory other)
{
return true;
}
@Nullable
@Override
public LookupIntrospectHandler getIntrospectHandler()
{
throw new UnsupportedOperationException("not needed for this test");
}
@Override
public LookupExtractor get()
{
return new MapLookupExtractor(LOOKUP_MAP, false);
}
};
private static final LookupSegment LOOKUP_SEGMENT = new LookupSegment(LOOKUP_NAME, LOOKUP_EXTRACTOR_FACTORY);
@Test
public void test_getId()
{
Assert.assertEquals(SegmentId.dummy(LOOKUP_NAME), LOOKUP_SEGMENT.getId());
}
@Test
public void test_getDataInterval()
{
Assert.assertEquals(Intervals.ETERNITY, LOOKUP_SEGMENT.getDataInterval());
}
@Test
public void test_asQueryableIndex()
{
Assert.assertNull(LOOKUP_SEGMENT.asQueryableIndex());
}
@Test
public void test_asStorageAdapter_getAvailableDimensions()
{
Assert.assertEquals(
ImmutableList.of("k", "v"),
Lists.newArrayList(LOOKUP_SEGMENT.asStorageAdapter().getAvailableDimensions().iterator())
);
}
@Test
public void test_asStorageAdapter_getAvailableMetrics()
{
Assert.assertEquals(
ImmutableList.of(),
Lists.newArrayList(LOOKUP_SEGMENT.asStorageAdapter().getAvailableMetrics())
);
}
@Test
public void test_asStorageAdapter_getColumnCapabilitiesK()
{
final ColumnCapabilities capabilities = LOOKUP_SEGMENT.asStorageAdapter().getColumnCapabilities("k");
Assert.assertEquals(ValueType.STRING, capabilities.getType());
// Note: the "k" column does not actually have multiple values, but the RowBasedStorageAdapter doesn't allow
// reporting complete single-valued capabilities. It would be good to change this in the future, so query engines
// running on top of lookups can take advantage of singly-valued optimizations.
Assert.assertTrue(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isDictionaryEncoded());
Assert.assertFalse(capabilities.isComplete());
}
@Test
public void test_asStorageAdapter_getColumnCapabilitiesV()
{
final ColumnCapabilities capabilities = LOOKUP_SEGMENT.asStorageAdapter().getColumnCapabilities("v");
// Note: the "v" column does not actually have multiple values, but the RowBasedStorageAdapter doesn't allow
// reporting complete single-valued capabilities. It would be good to change this in the future, so query engines
// running on top of lookups can take advantage of singly-valued optimizations.
Assert.assertEquals(ValueType.STRING, capabilities.getType());
Assert.assertTrue(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isDictionaryEncoded());
Assert.assertFalse(capabilities.isComplete());
}
@Test
public void test_asStorageAdapter_getInterval()
{
Assert.assertEquals(Intervals.ETERNITY, LOOKUP_SEGMENT.asStorageAdapter().getInterval());
}
@Test
public void test_asStorageAdapter_getDimensionCardinalityK()
{
Assert.assertEquals(Integer.MAX_VALUE, LOOKUP_SEGMENT.asStorageAdapter().getDimensionCardinality("k"));
}
@Test
public void test_asStorageAdapter_getDimensionCardinalityV()
{
Assert.assertEquals(Integer.MAX_VALUE, LOOKUP_SEGMENT.asStorageAdapter().getDimensionCardinality("v"));
}
@Test
public void test_asStorageAdapter_makeCursors()
{
final Sequence<Cursor> cursors = LOOKUP_SEGMENT.asStorageAdapter().makeCursors(
null,
Intervals.of("1970/PT1H"),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
final Cursor cursor = Iterables.getOnlyElement(cursors.toList());
final List<Pair<String, String>> kvs = new ArrayList<>();
final ColumnValueSelector keySelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("k");
final ColumnValueSelector valueSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
while (!cursor.isDone()) {
kvs.add(Pair.of(String.valueOf(keySelector.getObject()), String.valueOf(valueSelector.getObject())));
cursor.advanceUninterruptibly();
}
Assert.assertEquals(
ImmutableList.of(
Pair.of("a", "b"),
Pair.of("x", "y")
),
kvs
);
}
@Test
public void test_asStorageAdapter_isRowBasedAdapter()
{
// This allows us to assume that RowBasedStorageAdapterTest is further exercising makeCursors and verifying misc.
// methods like getMinTime, getMaxTime, getMetadata, etc, without checking them explicitly in _this_ test class.
Assert.assertThat(LOOKUP_SEGMENT.asStorageAdapter(), CoreMatchers.instanceOf(RowBasedStorageAdapter.class));
}
}

View File

@ -19,28 +19,33 @@
package org.apache.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
/**
* Helps tests make segments.
@ -63,7 +68,7 @@ public class IndexBuilder
private IndexBuilder()
{
// Callers must use "create".
}
public static IndexBuilder create()
@ -163,27 +168,13 @@ public class IndexBuilder
indexMerger.merge(
Lists.transform(
persisted,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
QueryableIndexIndexableAdapter::new
),
true,
Iterables.toArray(
Iterables.transform(
Arrays.asList(schema.getMetrics()),
new Function<AggregatorFactory, AggregatorFactory>()
{
@Override
public AggregatorFactory apply(AggregatorFactory input)
{
return input.getCombiningFactory();
}
}
AggregatorFactory::getCombiningFactory
),
AggregatorFactory.class
),
@ -201,6 +192,40 @@ public class IndexBuilder
}
}
public RowBasedSegment<InputRow> buildRowBasedSegmentWithoutTypeSignature()
{
return new RowBasedSegment<>(
SegmentId.dummy("IndexBuilder"),
rows,
RowAdapters.standardRow(),
ImmutableMap.of()
);
}
public RowBasedSegment<InputRow> buildRowBasedSegmentWithTypeSignature()
{
// Determine row signature by building an mmapped index first.
try (final QueryableIndex index = buildMMappedIndex()) {
final Map<String, ValueType> rowSignature =
index.getColumnNames().stream().collect(
Collectors.toMap(
column -> column,
column -> {
final ColumnCapabilities capabilities = index.getColumnHolder(column).getCapabilities();
return capabilities.getType();
}
)
);
return new RowBasedSegment<>(
SegmentId.dummy("IndexBuilder"),
rows,
RowAdapters.standardRow(),
rowSignature
);
}
}
private static IncrementalIndex buildIncrementalIndexWithRows(
IncrementalIndexSchema schema,
int maxRows,

View File

@ -0,0 +1,794 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
public class RowBasedStorageAdapterTest
{
private static final Map<String, ValueType> ROW_SIGNATURE =
ImmutableMap.<String, ValueType>builder()
.put(ValueType.FLOAT.name(), ValueType.FLOAT)
.put(ValueType.DOUBLE.name(), ValueType.DOUBLE)
.put(ValueType.LONG.name(), ValueType.LONG)
.put(ValueType.STRING.name(), ValueType.STRING)
.put(ValueType.COMPLEX.name(), ValueType.COMPLEX)
.build();
private static final List<Function<Cursor, Supplier<Object>>> READ_STRING =
ImmutableList.of(
cursor -> {
final BaseObjectColumnValueSelector selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(ValueType.STRING.name());
return selector::getObject;
}
);
private static final List<Function<Cursor, Supplier<Object>>> READ_TIME_AND_STRING =
ImmutableList.of(
cursor -> cursor::getTime,
cursor -> {
final BaseObjectColumnValueSelector selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(ValueType.STRING.name());
return selector::getObject;
}
);
// Processors used by the "allProcessors" tasks.
private static final LinkedHashMap<String, Function<Cursor, Supplier<Object>>> PROCESSORS = new LinkedHashMap<>();
@BeforeClass
public static void setUpClass()
{
NullHandling.initializeForTests();
PROCESSORS.clear();
PROCESSORS.put(
"cursor-time",
cursor -> cursor::getTime
);
// Read all the types as all the other types.
for (final String valueTypeName : ROW_SIGNATURE.keySet()) {
PROCESSORS.put(
StringUtils.format("%s-float", StringUtils.toLowerCase(valueTypeName)),
cursor -> {
final BaseFloatColumnValueSelector selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName);
return () -> {
if (selector.isNull()) {
return null;
} else {
return selector.getFloat();
}
};
}
);
PROCESSORS.put(
StringUtils.format("%s-double", StringUtils.toLowerCase(valueTypeName)),
cursor -> {
final BaseDoubleColumnValueSelector selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName);
return () -> {
if (selector.isNull()) {
return null;
} else {
return selector.getDouble();
}
};
}
);
PROCESSORS.put(
StringUtils.format("%s-long", StringUtils.toLowerCase(valueTypeName)),
cursor -> {
final BaseLongColumnValueSelector selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName);
return () -> {
if (selector.isNull()) {
return null;
} else {
return selector.getLong();
}
};
}
);
PROCESSORS.put(
StringUtils.format("%s-string", StringUtils.toLowerCase(valueTypeName)),
cursor -> {
final DimensionSelector selector =
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(valueTypeName));
return selector::defaultGetObject;
}
);
PROCESSORS.put(
StringUtils.format("%s-object", StringUtils.toLowerCase(valueTypeName)),
cursor -> {
final BaseObjectColumnValueSelector selector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(valueTypeName);
return selector::getObject;
}
);
}
}
/**
* A RowAdapter for Integers where:
*
* 1) timestampFunction returns a timestamp where the millis instant is equal to that integer as a number of hours
* since the epoch (1970).
* 2) columnFunction provides columns named after value types where each one equal to the cast to that type. All
* other columns return null.
*/
private static final RowAdapter<Integer> ROW_ADAPTER =
new RowAdapter<Integer>()
{
@Override
public ToLongFunction<Integer> timestampFunction()
{
return i -> i * Duration.standardHours(1).getMillis();
}
@Override
public Function<Integer, Object> columnFunction(String columnName)
{
final ValueType valueType = GuavaUtils.getEnumIfPresent(ValueType.class, columnName);
if (valueType == null || valueType == ValueType.COMPLEX) {
return i -> null;
} else {
return i -> DimensionHandlerUtils.convertObjectToType(i, valueType);
}
}
};
private static RowBasedStorageAdapter<Integer> createIntAdapter(final int... ints)
{
return new RowBasedStorageAdapter<>(
Arrays.stream(ints).boxed().collect(Collectors.toList()),
ROW_ADAPTER,
ROW_SIGNATURE
);
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void test_getInterval()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter();
Assert.assertEquals(Intervals.ETERNITY, adapter.getInterval());
}
@Test
public void test_getAvailableDimensions()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter();
// Sort them for comparison purposes.
Assert.assertEquals(
ROW_SIGNATURE.keySet().stream().sorted().collect(Collectors.toList()),
Lists.newArrayList(adapter.getAvailableDimensions()).stream().sorted().collect(Collectors.toList())
);
}
@Test
public void test_getAvailableMetrics()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter();
Assert.assertEquals(
Collections.emptyList(),
Lists.newArrayList(adapter.getAvailableMetrics())
);
}
@Test
public void test_getDimensionCardinality_knownColumns()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
// Row based adapters don't know cardinality (they don't walk their Iterables until makeCursors is called).
for (String column : ROW_SIGNATURE.keySet()) {
Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality(column));
}
}
@Test
public void test_getDimensionCardinality_unknownColumn()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality("unknown"));
}
@Test
public void test_getDimensionCardinality_timeColumn()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality("__time"));
}
@Test
public void test_getMinTime()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertEquals(Intervals.ETERNITY.getStart(), adapter.getMinTime());
}
@Test
public void test_getMaxTime()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertEquals(Intervals.ETERNITY.getEnd().minus(1), adapter.getMaxTime());
}
@Test
public void test_getMinValue()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
// Row based adapters don't know min/max values, so they always return null.
// Test both known and unknown columns.
final List<String> columns =
ImmutableList.<String>builder().addAll(ROW_SIGNATURE.keySet()).add("unknown", "__time").build();
for (String column : columns) {
Assert.assertNull(column, adapter.getMinValue(column));
}
}
@Test
public void test_getMaxValue()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
// Row based adapters don't know min/max values, so they always return null.
// Test both known and unknown columns.
final List<String> columns =
ImmutableList.<String>builder().addAll(ROW_SIGNATURE.keySet()).add("unknown", "__time").build();
for (String column : columns) {
Assert.assertNull(column, adapter.getMaxValue(column));
}
}
@Test
public void test_getCapabilities()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
// Row based adapters don't know cardinality (they don't walk their Iterables until makeCursors is called).
for (String column : ROW_SIGNATURE.keySet()) {
Assert.assertEquals(Integer.MAX_VALUE, adapter.getDimensionCardinality(column));
}
}
@Test
public void test_getColumnCapabilities_float()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.FLOAT.name());
Assert.assertEquals(ValueType.FLOAT, capabilities.getType());
Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertTrue(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_double()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.DOUBLE.name());
Assert.assertEquals(ValueType.DOUBLE, capabilities.getType());
Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertTrue(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_long()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.LONG.name());
Assert.assertEquals(ValueType.LONG, capabilities.getType());
Assert.assertFalse(capabilities.hasMultipleValues());
Assert.assertTrue(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_string()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.STRING.name());
Assert.assertEquals(ValueType.STRING, capabilities.getType());
// Note: unlike numeric types, STRING-typed columns report that they might have multiple values and that they
// are incomplete. It would be good in the future to support some way of changing this, when it is known ahead
// of time that multi-valuedness is impossible.
Assert.assertTrue(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_complex()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(ValueType.COMPLEX.name());
// Note: unlike numeric types, COMPLEX-typed columns report that they might have multiple values and that they
// are incomplete.
Assert.assertEquals(ValueType.COMPLEX, capabilities.getType());
Assert.assertTrue(capabilities.hasMultipleValues());
Assert.assertFalse(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_nonexistent()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertNull(adapter.getColumnCapabilities("nonexistent"));
}
@Test
public void test_getColumnTypeName()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
for (String columnName : ROW_SIGNATURE.keySet()) {
Assert.assertEquals(columnName, ValueType.valueOf(columnName).name(), adapter.getColumnTypeName(columnName));
}
}
@Test
public void test_getColumnTypeName_nonexistent()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertNull(adapter.getColumnTypeName("nonexistent"));
}
@Test
public void test_getNumRows()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
expectedException.expect(UnsupportedOperationException.class);
adapter.getMetadata();
}
@Test
public void test_getMaxIngestedEventTime()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
Assert.assertEquals(Intervals.ETERNITY.getEnd().minus(1), adapter.getMaxIngestedEventTime());
}
@Test
public void test_getMetadata()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
expectedException.expect(UnsupportedOperationException.class);
adapter.getMetadata();
}
@Test
public void test_makeCursors_filterOnLong()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final Sequence<Cursor> cursors = adapter.makeCursors(
new SelectorDimFilter(ValueType.LONG.name(), "1.0", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("1")
),
walkCursors(cursors, READ_STRING)
);
}
@Test
public void test_makeCursors_filterOnNonexistentColumnEqualsNull()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1);
final Sequence<Cursor> cursors = adapter.makeCursors(
new SelectorDimFilter("nonexistent", null, null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("0"),
ImmutableList.of("1")
),
walkCursors(cursors, READ_STRING)
);
}
@Test
public void test_makeCursors_filterOnVirtualColumn()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1);
final Sequence<Cursor> cursors = adapter.makeCursors(
new SelectorDimFilter("vc", "2", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.create(
ImmutableList.of(
new ExpressionVirtualColumn(
"vc",
"\"LONG\" + 1",
ValueType.LONG,
ExprMacroTable.nil()
)
)
),
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("1")
),
walkCursors(cursors, READ_STRING)
);
}
@Test
public void test_makeCursors_descending()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
true,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("2"),
ImmutableList.of("1"),
ImmutableList.of("0")
),
walkCursors(cursors, READ_STRING)
);
}
@Test
public void test_makeCursors_intervalDoesNotMatch()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.of("2000/P1D"),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(),
walkCursors(cursors, READ_STRING)
);
}
@Test
public void test_makeCursors_intervalPartiallyMatches()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.of("1970-01-01T01/PT1H"),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("1")
),
walkCursors(cursors, READ_STRING)
);
}
@Test
public void test_makeCursors_hourGranularity()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 1, 2, 3);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.of("1970/1971"),
VirtualColumns.EMPTY,
Granularities.HOUR,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(DateTimes.of("1970-01-01T00"), "0"),
ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"),
ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"),
ImmutableList.of(DateTimes.of("1970-01-01T02"), "2"),
ImmutableList.of(DateTimes.of("1970-01-01T03"), "3")
),
walkCursors(cursors, READ_TIME_AND_STRING)
);
}
@Test
public void test_makeCursors_hourGranularityWithInterval()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 1, 2, 3);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.of("1970-01-01T01/PT2H"),
VirtualColumns.EMPTY,
Granularities.HOUR,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"),
ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"),
ImmutableList.of(DateTimes.of("1970-01-01T02"), "2")
),
walkCursors(cursors, READ_TIME_AND_STRING)
);
}
@Test
public void test_makeCursors_hourGranularityWithIntervalDescending()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 1, 2, 3);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.of("1970-01-01T01/PT2H"),
VirtualColumns.EMPTY,
Granularities.HOUR,
true,
null
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(DateTimes.of("1970-01-01T02"), "2"),
ImmutableList.of(DateTimes.of("1970-01-01T01"), "1"),
ImmutableList.of(DateTimes.of("1970-01-01T01"), "1")
),
walkCursors(cursors, READ_TIME_AND_STRING)
);
}
@Test
public void test_makeCursors_allProcessors()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1);
final Sequence<Cursor> cursors = adapter.makeCursors(
null,
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(
Lists.newArrayList(
Intervals.ETERNITY.getStart(),
// FLOAT
0f,
0d,
0L,
"0.0",
0f,
// DOUBLE
0f,
0d,
0L,
"0.0",
0d,
// LONG
0f,
0d,
0L,
"0",
0L,
// STRING
0f,
0d,
0L,
"0",
"0",
// COMPLEX
NullHandling.defaultFloatValue(),
NullHandling.defaultDoubleValue(),
NullHandling.defaultLongValue(),
null,
null
),
Lists.newArrayList(
Intervals.ETERNITY.getStart(),
// FLOAT
1f,
1d,
1L,
"1.0",
1f,
// DOUBLE
1f,
1d,
1L,
"1.0",
1d,
// LONG
1f,
1d,
1L,
"1",
1L,
// STRING
1f,
1d,
1L,
"1",
"1",
// COMPLEX
NullHandling.defaultFloatValue(),
NullHandling.defaultDoubleValue(),
NullHandling.defaultLongValue(),
null,
null
)
),
walkCursors(cursors, new ArrayList<>(PROCESSORS.values()))
);
}
@Test
public void test_makeCursors_filterOnNonexistentColumnEqualsNonnull()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1);
final Sequence<Cursor> cursors = adapter.makeCursors(
new SelectorDimFilter("nonexistent", "abc", null).toFilter(),
Intervals.ETERNITY,
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
Assert.assertEquals(
ImmutableList.of(),
walkCursors(cursors, new ArrayList<>(PROCESSORS.values()))
);
}
private static List<List<Object>> walkCursors(
final Sequence<Cursor> cursors,
final List<Function<Cursor, Supplier<Object>>> processors
)
{
return cursors.flatMap(
cursor -> {
// Gather test-value suppliers together.
final List<Supplier<Object>> suppliers = new ArrayList<>();
for (Function<Cursor, Supplier<Object>> processor : processors) {
suppliers.add(processor.apply(cursor));
}
final List<List<Object>> retVal = new ArrayList<>();
while (!cursor.isDone()) {
final List<Object> row = new ArrayList<>();
for (Supplier<Object> supplier : suppliers) {
row.add(supplier.get());
}
retVal.add(row);
cursor.advanceUninterruptibly();
}
return Sequences.simple(retVal);
}
).toList();
}
}

View File

@ -67,6 +67,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedColumnSelectorFactory;
import org.apache.druid.segment.RowBasedStorageAdapter;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
@ -263,23 +264,38 @@ public abstract class BaseFilterTest extends InitializedNullHandlingTest
"off-heap memory segment write-out medium", OffHeapMemorySegmentWriteOutMediumFactory.instance()
);
final Map<String, Function<IndexBuilder, Pair<StorageAdapter, Closeable>>> finishers = ImmutableMap.of(
"incremental",
input -> {
final IncrementalIndex index = input.buildIncrementalIndex();
return Pair.of(new IncrementalIndexStorageAdapter(index), index);
},
"mmapped",
input -> {
final QueryableIndex index = input.buildMMappedIndex();
return Pair.of(new QueryableIndexStorageAdapter(index), index);
},
"mmappedMerged",
input -> {
final QueryableIndex index = input.buildMMappedMergedIndex();
return Pair.of(new QueryableIndexStorageAdapter(index), index);
}
);
final Map<String, Function<IndexBuilder, Pair<StorageAdapter, Closeable>>> finishers =
ImmutableMap.<String, Function<IndexBuilder, Pair<StorageAdapter, Closeable>>>builder()
.put(
"incremental",
input -> {
final IncrementalIndex index = input.buildIncrementalIndex();
return Pair.of(new IncrementalIndexStorageAdapter(index), index);
}
)
.put(
"mmapped",
input -> {
final QueryableIndex index = input.buildMMappedIndex();
return Pair.of(new QueryableIndexStorageAdapter(index), index);
}
)
.put(
"mmappedMerged",
input -> {
final QueryableIndex index = input.buildMMappedMergedIndex();
return Pair.of(new QueryableIndexStorageAdapter(index), index);
}
)
.put(
"rowBasedWithoutTypeSignature",
input -> Pair.of(input.buildRowBasedSegmentWithoutTypeSignature().asStorageAdapter(), () -> {})
)
.put(
"rowBasedWithTypeSignature",
input -> Pair.of(input.buildRowBasedSegmentWithTypeSignature().asStorageAdapter(), () -> {})
)
.build();
for (Map.Entry<String, BitmapSerdeFactory> bitmapSerdeFactoryEntry : bitmapSerdeFactories.entrySet()) {
for (Map.Entry<String, SegmentWriteOutMediumFactory> segmentWriteOutMediumFactoryEntry :
@ -664,8 +680,9 @@ public abstract class BaseFilterTest extends InitializedNullHandlingTest
final List<String> expectedRows
)
{
// IncrementalIndex cannot ever vectorize.
final boolean testVectorized = !(adapter instanceof IncrementalIndexStorageAdapter);
// IncrementalIndex and RowBasedSegment cannot ever vectorize.
final boolean testVectorized =
!(adapter instanceof IncrementalIndexStorageAdapter) && !(adapter instanceof RowBasedStorageAdapter);
assertFilterMatches(filter, expectedRows, testVectorized);
}