abstract `IncrementalIndex` cursor stuff to prepare for using different "views" of the data based on the cursor build spec (#17064)

* abstract `IncrementalIndex` cursor stuff to prepare to allow for possibility of using different "views" of the data based on the cursor build spec
changes:
* introduce `IncrementalIndexRowSelector` interface to capture how `IncrementalIndexCursor` and `IncrementalIndexColumnSelectorFactory` read data
* `IncrementalIndex` implements `IncrementalIndexRowSelector`
* move `FactsHolder` interface to separate file
* other minor refactorings
This commit is contained in:
Clint Wylie 2024-09-15 16:45:51 -07:00 committed by GitHub
parent aa6336c5cf
commit 73a644258d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 593 additions and 418 deletions

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.incremental;
import java.util.Comparator;
import java.util.Iterator;
/**
* {@link IncrementalIndexRow} storage interface, a mutable data structure for building up a set or rows to eventually
* persist into an immutable segment
*
* @see IncrementalIndex for the data processor which constructs {@link IncrementalIndexRow} to store here
*/
public interface FactsHolder
{
/**
* @return the previous rowIndex associated with the specified key, or
* {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/
int getPriorIndex(IncrementalIndexRow key);
/**
* Get minimum {@link IncrementalIndexRow#getTimestamp()} present in the facts holder
*/
long getMinTimeMillis();
/**
* Get maximum {@link IncrementalIndexRow#getTimestamp()} present in the facts holder
*/
long getMaxTimeMillis();
/**
* Get all {@link IncrementalIndex}, depending on the implementation, these rows may or may not be ordered in the same
* order they will be persisted in. Use {@link #persistIterable()} if this is required.
*/
Iterator<IncrementalIndexRow> iterator(boolean descending);
/**
* Get all {@link IncrementalIndexRow} with {@link IncrementalIndexRow#getTimestamp()} between the start and end
* timestamps specified
*/
Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long timeStart, long timeEnd);
/**
* Get all row {@link IncrementalIndexRow} 'keys', which is distinct groups if this is an aggregating facts holder or
* just every row present if not
*/
Iterable<IncrementalIndexRow> keySet();
/**
* Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator <IncrementalIndexRow>}
*/
Iterable<IncrementalIndexRow> persistIterable();
/**
* @return the previous rowIndex associated with the specified key, or
* {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/
int putIfAbsent(IncrementalIndexRow key, int rowIndex);
/**
* Clear all rows present in the facts holder
*/
void clear();
}

View File

@ -105,7 +105,7 @@ import java.util.stream.Collectors;
* {@link IncrementalIndexCursorFactory} are thread-safe, and may be called concurrently with each other, and with
* the "add" methods. This concurrency model supports real-time queries of the data in the index.
*/
public abstract class IncrementalIndex implements Iterable<Row>, Closeable, ColumnInspector
public abstract class IncrementalIndex implements IncrementalIndexRowSelector, ColumnInspector, Iterable<Row>, Closeable
{
/**
* Column selector used at ingestion time for inputs to aggregators.
@ -255,8 +255,9 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
private final boolean useSchemaDiscovery;
private final InputRowHolder inputRowHolder = new InputRowHolder();
protected final InputRowHolder inputRowHolder = new InputRowHolder();
@Nullable
private volatile DateTime maxIngestedEventTime;
/**
@ -366,8 +367,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
);
}
public abstract FactsHolder getFacts();
public abstract boolean canAppendRow();
public abstract String getOutOfRowsReason();
@ -384,100 +383,11 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
boolean skipMaxRowsInMemoryCheck
) throws IndexSizeExceededException;
public abstract int getLastRowIndex();
protected abstract float getMetricFloatValue(int rowOffset, int aggOffset);
protected abstract long getMetricLongValue(int rowOffset, int aggOffset);
protected abstract Object getMetricObjectValue(int rowOffset, int aggOffset);
protected abstract double getMetricDoubleValue(int rowOffset, int aggOffset);
protected abstract boolean isNull(int rowOffset, int aggOffset);
static class IncrementalIndexRowResult
{
private final IncrementalIndexRow incrementalIndexRow;
private final List<String> parseExceptionMessages;
IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List<String> parseExceptionMessages)
{
this.incrementalIndexRow = incrementalIndexRow;
this.parseExceptionMessages = parseExceptionMessages;
}
IncrementalIndexRow getIncrementalIndexRow()
{
return incrementalIndexRow;
}
List<String> getParseExceptionMessages()
{
return parseExceptionMessages;
}
}
static class AddToFactsResult
{
private final int rowCount;
private final long bytesInMemory;
private final List<String> parseExceptionMessages;
public AddToFactsResult(
int rowCount,
long bytesInMemory,
List<String> parseExceptionMessages
)
{
this.rowCount = rowCount;
this.bytesInMemory = bytesInMemory;
this.parseExceptionMessages = parseExceptionMessages;
}
int getRowCount()
{
return rowCount;
}
public long getBytesInMemory()
{
return bytesInMemory;
}
public List<String> getParseExceptionMessages()
{
return parseExceptionMessages;
}
}
public static class InputRowHolder
{
@Nullable
private InputRow row;
private long rowId = -1;
public void set(final InputRow row)
{
this.row = row;
this.rowId++;
}
public void unset()
{
this.row = null;
}
public InputRow getRow()
{
return Preconditions.checkNotNull(row, "row");
}
public long getRowId()
{
return rowId;
}
}
public abstract Iterable<Row> iterableWithPostAggregations(
@Nullable List<PostAggregator> postAggs,
boolean descending
);
public boolean isRollup()
{
@ -746,23 +656,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
);
}
private static String getSimplifiedEventStringFromRow(InputRow inputRow)
{
if (inputRow instanceof MapBasedInputRow) {
return ((MapBasedInputRow) inputRow).getEvent().toString();
}
if (inputRow instanceof ListBasedInputRow) {
return ((ListBasedInputRow) inputRow).asMap().toString();
}
if (inputRow instanceof TransformedInputRow) {
InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
return getSimplifiedEventStringFromRow(innerRow);
}
return inputRow.toString();
}
private synchronized void updateMaxIngestedTime(DateTime eventTime)
{
@ -771,6 +664,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
}
}
@Override
public boolean isEmpty()
{
return numEntries.get() == 0;
@ -861,6 +755,7 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
/**
* Returns the descriptor for a particular dimension.
*/
@Override
@Nullable
public DimensionDesc getDimension(String dimension)
{
@ -869,22 +764,39 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
}
}
public ColumnValueSelector<?> makeMetricColumnValueSelector(String metric, IncrementalIndexRowHolder currEntry)
@Override
@Nullable
public MetricDesc getMetric(String metric)
{
MetricDesc metricDesc = metricDescs.get(metric);
return metricDescs.get(metric);
}
@Override
public List<OrderBy> getOrdering()
{
return metadata.getOrdering();
}
public static ColumnValueSelector<?> makeMetricColumnValueSelector(
IncrementalIndexRowSelector rowSelector,
IncrementalIndexRowHolder currEntry,
String metric
)
{
final MetricDesc metricDesc = rowSelector.getMetric(metric);
if (metricDesc == null) {
return NilColumnValueSelector.instance();
}
int metricIndex = metricDesc.getIndex();
switch (metricDesc.getCapabilities().getType()) {
case COMPLEX:
return new ObjectMetricColumnSelector(metricDesc, currEntry, metricIndex);
return new ObjectMetricColumnSelector(rowSelector, currEntry, metricDesc);
case LONG:
return new LongMetricColumnSelector(currEntry, metricIndex);
return new LongMetricColumnSelector(rowSelector, currEntry, metricIndex);
case FLOAT:
return new FloatMetricColumnSelector(currEntry, metricIndex);
return new FloatMetricColumnSelector(rowSelector, currEntry, metricIndex);
case DOUBLE:
return new DoubleMetricColumnSelector(currEntry, metricIndex);
return new DoubleMetricColumnSelector(rowSelector, currEntry, metricIndex);
case STRING:
throw new IllegalStateException("String is not a metric column type");
default:
@ -910,13 +822,6 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return isEmpty() ? null : DateTimes.utc(getMaxTimeMillis());
}
@Nullable
public Integer getDimensionIndex(String dimension)
{
DimensionDesc dimSpec = getDimension(dimension);
return dimSpec == null ? null : dimSpec.getIndex();
}
/**
* Returns names of time and dimension columns, in persist sort order. Includes {@link ColumnHolder#TIME_COLUMN_NAME}.
*/
@ -1003,6 +908,49 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return metadata;
}
@Override
public Iterator<Row> iterator()
{
return iterableWithPostAggregations(null, false).iterator();
}
public DateTime getMaxIngestedEventTime()
{
return maxIngestedEventTime;
}
protected ColumnSelectorFactory makeColumnSelectorFactory(
@Nullable final AggregatorFactory agg,
final InputRowHolder in
)
{
return makeColumnSelectorFactory(virtualColumns, in, agg);
}
protected final Comparator<IncrementalIndexRow> dimsComparator()
{
return new IncrementalIndexRowComparator(timePosition, dimensionDescsList);
}
private static String getSimplifiedEventStringFromRow(InputRow inputRow)
{
if (inputRow instanceof MapBasedInputRow) {
return ((MapBasedInputRow) inputRow).getEvent().toString();
}
if (inputRow instanceof ListBasedInputRow) {
return ((ListBasedInputRow) inputRow).asMap().toString();
}
if (inputRow instanceof TransformedInputRow) {
InputRow innerRow = ((TransformedInputRow) inputRow).getBaseRow();
return getSimplifiedEventStringFromRow(innerRow);
}
return inputRow.toString();
}
private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)
{
AggregatorFactory[] combiningAggregators = new AggregatorFactory[aggregators.length];
@ -1012,30 +960,24 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return combiningAggregators;
}
@Override
public Iterator<Row> iterator()
private static boolean allNull(Object[] dims, int startPosition)
{
return iterableWithPostAggregations(null, false).iterator();
}
public abstract Iterable<Row> iterableWithPostAggregations(
@Nullable List<PostAggregator> postAggs,
boolean descending
);
public DateTime getMaxIngestedEventTime()
{
return maxIngestedEventTime;
for (int i = startPosition; i < dims.length; i++) {
if (dims[i] != null) {
return false;
}
}
return true;
}
public static final class DimensionDesc
{
private final int index;
private final String name;
private final DimensionHandler handler;
private final DimensionIndexer indexer;
private final DimensionHandler<?, ?, ?> handler;
private final DimensionIndexer<?, ?, ?> indexer;
public DimensionDesc(int index, String name, DimensionHandler handler, boolean useMaxMemoryEstimates)
public DimensionDesc(int index, String name, DimensionHandler<?, ?, ?> handler, boolean useMaxMemoryEstimates)
{
this.index = index;
this.name = name;
@ -1058,12 +1000,12 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
return indexer.getColumnCapabilities();
}
public DimensionHandler getHandler()
public DimensionHandler<?, ?, ?> getHandler()
{
return handler;
}
public DimensionIndexer getIndexer()
public DimensionIndexer<?, ?, ?> getIndexer()
{
return indexer;
}
@ -1124,19 +1066,90 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
}
}
protected ColumnSelectorFactory makeColumnSelectorFactory(
@Nullable final AggregatorFactory agg,
final InputRowHolder in
)
public static class AddToFactsResult
{
return makeColumnSelectorFactory(virtualColumns, in, agg);
private final int rowCount;
private final long bytesInMemory;
private final List<String> parseExceptionMessages;
public AddToFactsResult(
int rowCount,
long bytesInMemory,
List<String> parseExceptionMessages
)
{
this.rowCount = rowCount;
this.bytesInMemory = bytesInMemory;
this.parseExceptionMessages = parseExceptionMessages;
}
int getRowCount()
{
return rowCount;
}
public long getBytesInMemory()
{
return bytesInMemory;
}
public List<String> getParseExceptionMessages()
{
return parseExceptionMessages;
}
}
protected final Comparator<IncrementalIndexRow> dimsComparator()
public static class InputRowHolder
{
return new IncrementalIndexRowComparator(timePosition, dimensionDescsList);
@Nullable
private InputRow row;
private long rowId = -1;
public void set(final InputRow row)
{
this.row = row;
this.rowId++;
}
public void unset()
{
this.row = null;
}
public InputRow getRow()
{
return Preconditions.checkNotNull(row, "row");
}
public long getRowId()
{
return rowId;
}
}
static class IncrementalIndexRowResult
{
private final IncrementalIndexRow incrementalIndexRow;
private final List<String> parseExceptionMessages;
IncrementalIndexRowResult(IncrementalIndexRow incrementalIndexRow, List<String> parseExceptionMessages)
{
this.incrementalIndexRow = incrementalIndexRow;
this.parseExceptionMessages = parseExceptionMessages;
}
IncrementalIndexRow getIncrementalIndexRow()
{
return incrementalIndexRow;
}
List<String> getParseExceptionMessages()
{
return parseExceptionMessages;
}
}
@VisibleForTesting
static final class IncrementalIndexRowComparator implements Comparator<IncrementalIndexRow>
{
@ -1207,57 +1220,19 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
}
}
private static boolean allNull(Object[] dims, int startPosition)
{
for (int i = startPosition; i < dims.length; i++) {
if (dims[i] != null) {
return false;
}
}
return true;
}
public interface FactsHolder
{
/**
* @return the previous rowIndex associated with the specified key, or
* {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/
int getPriorIndex(IncrementalIndexRow key);
long getMinTimeMillis();
long getMaxTimeMillis();
Iterator<IncrementalIndexRow> iterator(boolean descending);
Iterable<IncrementalIndexRow> timeRangeIterable(boolean descending, long timeStart, long timeEnd);
Iterable<IncrementalIndexRow> keySet();
/**
* Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>}
*
* @return
*/
Iterable<IncrementalIndexRow> persistIterable();
/**
* @return the previous rowIndex associated with the specified key, or
* {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/
int putIfAbsent(IncrementalIndexRow key, int rowIndex);
void clear();
}
private final class LongMetricColumnSelector implements LongColumnSelector
private static final class LongMetricColumnSelector implements LongColumnSelector
{
private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
public LongMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricIndex)
public LongMetricColumnSelector(
IncrementalIndexRowSelector rowSelector,
IncrementalIndexRowHolder currEntry,
int metricIndex
)
{
this.rowSelector = rowSelector;
this.currEntry = currEntry;
this.metricIndex = metricIndex;
}
@ -1265,99 +1240,72 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
@Override
public long getLong()
{
assert NullHandling.replaceWithDefault() || !isNull();
return getMetricLongValue(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", IncrementalIndex.this);
return rowSelector.getMetricLongValue(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public boolean isNull()
{
return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex);
}
}
private final class ObjectMetricColumnSelector extends ObjectColumnSelector
{
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
private Class classOfObject;
public ObjectMetricColumnSelector(
MetricDesc metricDesc,
IncrementalIndexRowHolder currEntry,
int metricIndex
)
{
this.currEntry = currEntry;
this.metricIndex = metricIndex;
classOfObject = ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
}
@Nullable
@Override
public Object getObject()
{
return getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public Class classOfObject()
{
return classOfObject;
return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", IncrementalIndex.this);
inspector.visit("index", rowSelector);
}
}
private final class FloatMetricColumnSelector implements FloatColumnSelector
private static final class FloatMetricColumnSelector implements FloatColumnSelector
{
private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
public FloatMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricIndex)
public FloatMetricColumnSelector(
IncrementalIndexRowSelector rowSelector,
IncrementalIndexRowHolder currEntry,
int metricIndex
)
{
this.currEntry = currEntry;
this.rowSelector = rowSelector;
this.metricIndex = metricIndex;
}
@Override
public float getFloat()
{
assert NullHandling.replaceWithDefault() || !isNull();
return getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex);
return rowSelector.getMetricFloatValue(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", IncrementalIndex.this);
inspector.visit("index", rowSelector);
}
@Override
public boolean isNull()
{
return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex);
return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
}
}
private final class DoubleMetricColumnSelector implements DoubleColumnSelector
private static final class DoubleMetricColumnSelector implements DoubleColumnSelector
{
private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
public DoubleMetricColumnSelector(IncrementalIndexRowHolder currEntry, int metricIndex)
public DoubleMetricColumnSelector(
IncrementalIndexRowSelector rowSelector,
IncrementalIndexRowHolder currEntry,
int metricIndex
)
{
this.currEntry = currEntry;
this.rowSelector = rowSelector;
this.metricIndex = metricIndex;
}
@ -1365,19 +1313,58 @@ public abstract class IncrementalIndex implements Iterable<Row>, Closeable, Colu
public double getDouble()
{
assert NullHandling.replaceWithDefault() || !isNull();
return getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex);
return rowSelector.getMetricDoubleValue(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public boolean isNull()
{
return IncrementalIndex.this.isNull(currEntry.get().getRowIndex(), metricIndex);
return rowSelector.isNull(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", IncrementalIndex.this);
inspector.visit("index", rowSelector);
}
}
private static final class ObjectMetricColumnSelector extends ObjectColumnSelector
{
private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final int metricIndex;
private final Class<?> classOfObject;
public ObjectMetricColumnSelector(
IncrementalIndexRowSelector rowSelector,
IncrementalIndexRowHolder currEntry,
MetricDesc metricDesc
)
{
this.currEntry = currEntry;
this.rowSelector = rowSelector;
this.metricIndex = metricDesc.getIndex();
this.classOfObject = ComplexMetrics.getSerdeForType(metricDesc.getType()).getObjectStrategy().getClazz();
}
@Nullable
@Override
public Object getObject()
{
return rowSelector.getMetricObjectValue(currEntry.get().getRowIndex(), metricIndex);
}
@Override
public Class<?> classOfObject()
{
return classOfObject;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", rowSelector);
}
}
}

View File

@ -43,29 +43,29 @@ import javax.annotation.Nullable;
class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory, RowIdSupplier
{
private final ColumnInspector snapshotColumnInspector;
private final IncrementalIndex index;
private final VirtualColumns virtualColumns;
private final Order timeOrder;
private final IncrementalIndexRowHolder rowHolder;
private final IncrementalIndexRowSelector rowSelector;
IncrementalIndexColumnSelectorFactory(
IncrementalIndex index,
IncrementalIndexRowSelector rowSelector,
VirtualColumns virtualColumns,
Order timeOrder,
IncrementalIndexRowHolder rowHolder
)
{
this.index = index;
this.virtualColumns = virtualColumns;
this.timeOrder = timeOrder;
this.rowHolder = rowHolder;
this.rowSelector = rowSelector;
this.snapshotColumnInspector = new ColumnInspector()
{
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return IncrementalIndexCursorFactory.snapshotColumnCapabilities(index, column);
return IncrementalIndexCursorFactory.snapshotColumnCapabilities(rowSelector, column);
}
};
}
@ -87,13 +87,13 @@ class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory, Ro
if (dimension.equals(ColumnHolder.TIME_COLUMN_NAME) && timeOrder != Order.NONE) {
return new SingleScanTimeDimensionSelector(
makeColumnValueSelector(dimension),
makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
extractionFn,
timeOrder
);
}
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension());
final IncrementalIndex.DimensionDesc dimensionDesc = rowSelector.getDimension(dimensionSpec.getDimension());
if (dimensionDesc == null) {
// not a dimension, column may be a metric
ColumnCapabilities capabilities = getColumnCapabilities(dimension);
@ -122,19 +122,17 @@ class IncrementalIndexColumnSelectorFactory implements ColumnSelectorFactory, Ro
if (virtualColumns.exists(columnName)) {
return virtualColumns.makeColumnValueSelector(columnName, this);
}
if (columnName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
if (ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
return rowHolder;
}
final Integer dimIndex = index.getDimensionIndex(columnName);
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final IncrementalIndex.DimensionDesc dimensionDesc = rowSelector.getDimension(columnName);
if (dimensionDesc != null) {
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeColumnValueSelector(rowHolder, dimensionDesc);
}
return index.makeMetricColumnValueSelector(columnName, rowHolder);
return IncrementalIndex.makeMetricColumnValueSelector(rowSelector, rowHolder, columnName);
}
@Override

View File

@ -99,9 +99,9 @@ public class IncrementalIndexCursorFactory implements CursorFactory
return snapshotColumnCapabilities(index, column);
}
static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndex index, String column)
static ColumnCapabilities snapshotColumnCapabilities(IncrementalIndexRowSelector selector, String column)
{
IncrementalIndex.DimensionDesc desc = index.getDimension(column);
IncrementalIndex.DimensionDesc desc = selector.getDimension(column);
// nested column indexer is a liar, and behaves like any type if it only processes unnested literals of a single
// type, so force it to use nested column type
if (desc != null && desc.getIndexer() instanceof NestedDataColumnIndexerV4) {
@ -122,7 +122,7 @@ public class IncrementalIndexCursorFactory implements CursorFactory
// multi-valuedness at cursor creation time, instead of the latest state, and getSnapshotColumnCapabilities could
// be removed.
return ColumnCapabilitiesImpl.snapshot(
index.getColumnCapabilities(column),
selector.getColumnCapabilities(column),
COERCE_LOGIC
);
}

View File

@ -23,52 +23,46 @@ import com.google.common.collect.Iterators;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.Cursors;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.filter.ValueMatchers;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class IncrementalIndexCursorHolder implements CursorHolder
{
private final IncrementalIndex index;
private final IncrementalIndexRowSelector rowSelector;
private final CursorBuildSpec spec;
private final List<OrderBy> ordering;
public IncrementalIndexCursorHolder(
IncrementalIndex index,
IncrementalIndexRowSelector rowSelector,
CursorBuildSpec spec
)
{
this.index = index;
this.rowSelector = rowSelector;
this.spec = spec;
if (index.timePosition == 0) {
List<OrderBy> ordering = rowSelector.getOrdering();
if (Cursors.getTimeOrdering(ordering) != Order.NONE) {
if (Cursors.preferDescendingTimeOrdering(spec)) {
this.ordering = Cursors.descendingTimeOrder();
} else {
this.ordering = Cursors.ascendingTimeOrder();
}
} else {
// In principle, we could report a sort order here for certain types of fact holders; for example the
// RollupFactsHolder would be sorted by dimensions. However, this is left for future work.
this.ordering = Collections.emptyList();
this.ordering = ordering;
}
}
@Override
public Cursor asCursor()
{
if (index.isEmpty()) {
if (rowSelector.isEmpty()) {
return null;
}
@ -76,13 +70,10 @@ public class IncrementalIndexCursorHolder implements CursorHolder
spec.getQueryMetrics().vectorized(false);
}
return new IncrementalIndexCursor(
index,
spec.getVirtualColumns(),
Cursors.getTimeOrdering(ordering),
spec.getFilter(),
spec.getInterval()
rowSelector,
spec,
Cursors.getTimeOrdering(ordering)
);
}
@ -94,11 +85,11 @@ public class IncrementalIndexCursorHolder implements CursorHolder
static class IncrementalIndexCursor implements Cursor
{
private IncrementalIndexRowHolder currEntry;
private final IncrementalIndexRowSelector rowSelector;
private final IncrementalIndexRowHolder currEntry;
private final ColumnSelectorFactory columnSelectorFactory;
private final ValueMatcher filterMatcher;
private final int maxRowIndex;
private final IncrementalIndex.FactsHolder facts;
private Iterator<IncrementalIndexRow> baseIter;
private Iterable<IncrementalIndexRow> cursorIterable;
private boolean emptyRange;
@ -106,30 +97,31 @@ public class IncrementalIndexCursorHolder implements CursorHolder
private boolean done;
IncrementalIndexCursor(
IncrementalIndex index,
VirtualColumns virtualColumns,
Order timeOrder,
@Nullable Filter filter,
Interval actualInterval
IncrementalIndexRowSelector index,
CursorBuildSpec buildSpec,
Order timeOrder
)
{
currEntry = new IncrementalIndexRowHolder();
// Set maxRowIndex before creating the filterMatcher. See https://github.com/apache/druid/pull/6340
maxRowIndex = index.getLastRowIndex();
numAdvanced = -1;
rowSelector = index;
cursorIterable = rowSelector.getFacts().timeRangeIterable(
timeOrder == Order.DESCENDING,
buildSpec.getInterval().getStartMillis(),
buildSpec.getInterval().getEndMillis()
);
columnSelectorFactory = new IncrementalIndexColumnSelectorFactory(
index,
virtualColumns,
rowSelector,
buildSpec.getVirtualColumns(),
timeOrder,
currEntry
);
// Set maxRowIndex before creating the filterMatcher. See https://github.com/apache/druid/pull/6340
maxRowIndex = index.getLastRowIndex();
filterMatcher = filter == null ? ValueMatchers.allTrue() : filter.makeMatcher(columnSelectorFactory);
numAdvanced = -1;
facts = index.getFacts();
cursorIterable = facts.timeRangeIterable(
timeOrder == Order.DESCENDING,
actualInterval.getStartMillis(),
actualInterval.getEndMillis()
);
filterMatcher = buildSpec.getFilter() == null
? ValueMatchers.allTrue()
: buildSpec.getFilter().makeMatcher(columnSelectorFactory);
emptyRange = !cursorIterable.iterator().hasNext();
reset();
@ -152,7 +144,7 @@ public class IncrementalIndexCursorHolder implements CursorHolder
while (baseIter.hasNext()) {
BaseQuery.checkInterrupted();
IncrementalIndexRow entry = baseIter.next();
final IncrementalIndexRow entry = baseIter.next();
if (beyondMaxRowIndex(entry.getRowIndex())) {
continue;
}

View File

@ -144,6 +144,8 @@ public final class IncrementalIndexRow
{
if (input == null || (input.getClass().isArray() && Array.getLength(input) == 0)) {
return Collections.singletonList("null");
} else if (input instanceof int[]) {
return Arrays.toString((int[]) input);
}
return Collections.singletonList(input);
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.incremental;
import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.ColumnInspector;
import javax.annotation.Nullable;
import java.util.List;
/**
* Interface that abstracts selecting data from a {@link FactsHolder}
*/
public interface IncrementalIndexRowSelector extends ColumnInspector
{
/**
* get {@link IncrementalIndex.DimensionDesc} for the specified column, if available, which provides access to things
* like {@link org.apache.druid.segment.DimensionIndexer} and {@link org.apache.druid.segment.DimensionHandler} as
* well as column capabilities and position within the row
*/
@Nullable
IncrementalIndex.DimensionDesc getDimension(String columnName);
/**
* Get {@link IncrementalIndex.MetricDesc} which provides column capabilities and position in the aggregators section
* of the row
*/
@Nullable
IncrementalIndex.MetricDesc getMetric(String s);
/**
* Ordering for the data in the facts table
*/
List<OrderBy> getOrdering();
/**
* Are there any {@link IncrementalIndexRow} stored in the {@link FactsHolder}?
*/
boolean isEmpty();
/**
* Get the {@link FactsHolder} containing all of the {@link IncrementalIndexRow} backing this selector
*/
FactsHolder getFacts();
/**
* Highest value {@link IncrementalIndexRow#getRowIndex()} available in this selector. Note that these values do not
* reflect the position of the row in the {@link FactsHolder}, rather just the order in which they were processed
*/
int getLastRowIndex();
/**
* @param rowOffset row to get float aggregator value
* @param aggOffset position of the aggregator in the aggregators array of the data schema
* @return float value of the metric
*/
float getMetricFloatValue(int rowOffset, int aggOffset);
/**
* @param rowOffset row to get long aggregator value
* @param aggOffset position of the aggregator in the aggregators array of the data schema
* @return long value of the aggregator for this row
*/
long getMetricLongValue(int rowOffset, int aggOffset);
/**
* @param rowOffset row to get double aggregator value
* @param aggOffset position of the aggregator in the aggregators array of the data schema
* @return double value of the aggregator for this row
*/
double getMetricDoubleValue(int rowOffset, int aggOffset);
/**
* @param rowOffset row to get long aggregator value
* @param aggOffset position of the aggregator in the aggregators array of the data schema
* @return long value of the aggregator for this row
*/
@Nullable
Object getMetricObjectValue(int rowOffset, int aggOffset);
/**
* @param rowOffset row to check for a aggregator value
* @param aggOffset position of the aggregator in the aggregators array of the data schema
* @return is the value null for this row?
*/
boolean isNull(int rowOffset, int aggOffset);
}

View File

@ -155,7 +155,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
} else {
this.facts = new PlainNonTimeOrderedFactsHolder(dimsComparator());
}
maxBytesPerRowForAggregators =
this.maxBytesPerRowForAggregators =
useMaxMemoryEstimates ? getMaxBytesPerRowForAggregators(incrementalIndexSchema) : 0;
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
}
@ -252,14 +252,15 @@ public class OnheapIncrementalIndex extends IncrementalIndex
) throws IndexSizeExceededException
{
final List<String> parseExceptionMessages = new ArrayList<>();
final AtomicLong totalSizeInBytes = getBytesInMemory();
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
final AggregatorFactory[] metrics = getMetrics();
final AtomicInteger numEntries = getNumEntries();
final AtomicLong totalSizeInBytes = getBytesInMemory();
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex);
aggs = aggregators.get(priorIndex);
long aggSizeDelta = doAggregate(metrics, aggs, inputRowHolder, parseExceptionMessages);
totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
} else {
@ -272,7 +273,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
aggSizeForRow += doAggregate(metrics, aggs, inputRowHolder, parseExceptionMessages);
final int rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs);
aggregators.put(rowIndex, aggs);
// Last ditch sanity checks
if ((numEntries.get() >= maxRowCount || totalSizeInBytes.get() >= maxBytesInMemory)
@ -363,6 +364,18 @@ public class OnheapIncrementalIndex extends IncrementalIndex
InputRowHolder inputRowHolder,
List<String> parseExceptionsHolder
)
{
return doAggregate(metrics, aggs, inputRowHolder, parseExceptionsHolder, useMaxMemoryEstimates, preserveExistingMetrics);
}
private static long doAggregate(
AggregatorFactory[] metrics,
Aggregator[] aggs,
InputRowHolder inputRowHolder,
List<String> parseExceptionsHolder,
boolean useMaxMemoryEstimates,
boolean preserveExistingMetrics
)
{
long totalIncrementalBytes = 0L;
for (int i = 0; i < metrics.length; i++) {
@ -418,17 +431,6 @@ public class OnheapIncrementalIndex extends IncrementalIndex
}
}
protected Aggregator[] concurrentGet(int offset)
{
// All get operations should be fine
return aggregators.get(offset);
}
protected void concurrentSet(int offset, Aggregator[] value)
{
aggregators.put(offset, value);
}
@Override
public boolean canAppendRow()
{
@ -459,42 +461,53 @@ public class OnheapIncrementalIndex extends IncrementalIndex
return outOfRowsReason;
}
protected Aggregator[] getAggsForRow(int rowOffset)
{
return concurrentGet(rowOffset);
}
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
return ((Number) getMetricHelper(
getMetricAggs(),
aggregators.get(rowOffset),
aggOffset,
Aggregator::getFloat
)).floatValue();
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
return ((Number) getMetricHelper(
getMetricAggs(),
aggregators.get(rowOffset),
aggOffset,
Aggregator::getLong
)).longValue();
}
@Override
public double getMetricDoubleValue(int rowOffset, int aggOffset)
{
return ((Number) getMetricHelper(
getMetricAggs(),
aggregators.get(rowOffset),
aggOffset,
Aggregator::getDouble
)).doubleValue();
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::get);
}
@Override
protected double getMetricDoubleValue(int rowOffset, int aggOffset)
{
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
return getMetricHelper(getMetricAggs(), aggregators.get(rowOffset), aggOffset, Aggregator::get);
}
@Override
public boolean isNull(int rowOffset, int aggOffset)
{
final Aggregator[] aggs = aggregators.get(rowOffset);
if (preserveExistingMetrics) {
return concurrentGet(rowOffset)[aggOffset].isNull() && concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull();
return aggs[aggOffset].isNull() && aggs[aggOffset + getMetricAggs().length].isNull();
} else {
return concurrentGet(rowOffset)[aggOffset].isNull();
return aggs[aggOffset].isNull();
}
}
@ -535,7 +548,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
theVals.put(dimensionName, rowVals);
}
Aggregator[] aggs = getAggsForRow(rowOffset);
Aggregator[] aggs = aggregators.get(rowOffset);
int aggLength = preserveExistingMetrics ? aggs.length / 2 : aggs.length;
for (int i = 0; i < aggLength; ++i) {
theVals.put(metrics[i].getName(), getMetricHelper(metrics, aggs, i, Aggregator::get));
@ -560,11 +573,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex
* for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
*/
@Nullable
private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
private <T> Object getMetricHelper(
AggregatorFactory[] metrics,
Aggregator[] aggs,
int aggOffset,
Function<Aggregator, T> getMetricTypeFunction
)
{
if (preserveExistingMetrics) {
// Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
// from two aggregators, the aggregator for aggregating from input into output field and the aggregator
// Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated
// values from two aggregators, the aggregator for aggregating from input into output field and the aggregator
// for combining already aggregated field
if (aggs[aggOffset].isNull()) {
// If the aggregator for aggregating from input into output field is null, then we get the value from the
@ -583,8 +601,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex
return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
}
} else {
// If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
// given aggOffset
// If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs,
// using the given aggOffset
return getMetricTypeFunction.apply(aggs[aggOffset]);
}
}

View File

@ -26,10 +26,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@ -48,15 +45,11 @@ public final class TestFrameProcessorUtils
{
final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema(
0,
new TimestampSpec("__time", "millis", null),
Granularities.NONE,
VirtualColumns.EMPTY,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
new AggregatorFactory[0],
false
)
IncrementalIndexSchema.builder()
.withTimestampSpec(new TimestampSpec("__time", "millis", null))
.withDimensionsSpec(DimensionsSpec.builder().useSchemaDiscovery(true).build())
.withRollup(false)
.build()
)
.setMaxRowCount(1000)
.build();

View File

@ -26,8 +26,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
@ -494,18 +492,17 @@ public class AutoTypeColumnIndexerTest extends InitializedNullHandlingTest
long minTimestamp = System.currentTimeMillis();
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema(
minTimestamp,
new TimestampSpec(TIME_COL, "millis", null),
Granularities.NONE,
VirtualColumns.EMPTY,
DimensionsSpec.builder()
.setDimensions(ImmutableList.of(new AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING)))
.useSchemaDiscovery(true)
.build(),
new AggregatorFactory[0],
false
)
IncrementalIndexSchema.builder()
.withMinTimestamp(minTimestamp)
.withTimestampSpec(new TimestampSpec(TIME_COL, "millis", null))
.withDimensionsSpec(
DimensionsSpec.builder()
.setDimensions(ImmutableList.of(new AutoTypeColumnSchema(NESTED_COL, ColumnType.STRING)))
.useSchemaDiscovery(true)
.build()
)
.withRollup(false)
.build()
)
.setMaxRowCount(1000)
.build();
@ -699,15 +696,16 @@ public class AutoTypeColumnIndexerTest extends InitializedNullHandlingTest
{
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema(
minTimestamp,
new TimestampSpec(TIME_COL, "millis", null),
Granularities.NONE,
VirtualColumns.EMPTY,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
new AggregatorFactory[0],
false
)
IncrementalIndexSchema.builder()
.withMinTimestamp(minTimestamp)
.withTimestampSpec(new TimestampSpec(TIME_COL, "millis", null))
.withDimensionsSpec(
DimensionsSpec.builder()
.useSchemaDiscovery(true)
.build()
)
.withRollup(false)
.build()
)
.setMaxRowCount(1000)
.build();

View File

@ -26,8 +26,6 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.ColumnType;
@ -478,15 +476,16 @@ public class NestedDataColumnIndexerV4Test extends InitializedNullHandlingTest
{
IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema(
minTimestamp,
new TimestampSpec(TIME_COL, "millis", null),
Granularities.NONE,
VirtualColumns.EMPTY,
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
new AggregatorFactory[0],
false
)
IncrementalIndexSchema.builder()
.withMinTimestamp(minTimestamp)
.withTimestampSpec(new TimestampSpec(TIME_COL, "millis", null))
.withDimensionsSpec(
DimensionsSpec.builder()
.useSchemaDiscovery(true)
.build()
)
.withRollup(false)
.build()
)
.setMaxRowCount(1000)
.build();

View File

@ -25,9 +25,12 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.LongMaxAggregator;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Rule;
@ -69,22 +72,39 @@ public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest
{
// Prepare the mocks & set close() call count expectation to 1
Aggregator mockedAggregator = EasyMock.createMock(LongMaxAggregator.class);
EasyMock.expect(mockedAggregator.aggregateWithSize()).andReturn(0L).anyTimes();
mockedAggregator.close();
EasyMock.expectLastCall().times(1);
final IncrementalIndex genericIndex = indexCreator.createIndex(
EasyMock.replay(mockedAggregator);
final IncrementalIndex incrementalIndex = indexCreator.createIndex(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.MINUTE)
.withMetrics(new LongMaxAggregatorFactory("max", "max"))
.withMetrics(new LongMaxAggregatorFactory("max", "max")
{
@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector)
{
return mockedAggregator;
}
@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
{
return new AggregatorAndSize(mockedAggregator, Long.BYTES);
}
})
.build()
);
// This test is specific to the on-heap index
if (!(genericIndex instanceof OnheapIncrementalIndex)) {
if (!(incrementalIndex instanceof OnheapIncrementalIndex)) {
return;
}
final OnheapIncrementalIndex index = (OnheapIncrementalIndex) genericIndex;
final OnheapIncrementalIndex index = (OnheapIncrementalIndex) incrementalIndex;
index.add(new MapBasedInputRow(
0,
@ -92,11 +112,7 @@ public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest
ImmutableMap.of("billy", 1, "max", 1)
));
// override the aggregators with the mocks
index.concurrentGet(0)[0] = mockedAggregator;
// close the indexer and validate the expectations
EasyMock.replay(mockedAggregator);
index.close();
EasyMock.verify(mockedAggregator);
}

View File

@ -28,10 +28,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
@ -80,15 +77,11 @@ public class IncrementalIndexMultiValueSpecTest extends InitializedNullHandlingT
new StringDimensionSchema("string3", DimensionSchema.MultiValueHandling.SORTED_SET, true)
)
);
IncrementalIndexSchema schema = new IncrementalIndexSchema(
0,
new TimestampSpec("ds", "auto", null),
Granularities.ALL,
VirtualColumns.EMPTY,
dimensionsSpec,
new AggregatorFactory[0],
false
);
IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
.withDimensionsSpec(dimensionsSpec)
.withRollup(false)
.build();
Map<String, Object> map = new HashMap<String, Object>()
{
@Override

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -50,7 +49,6 @@ import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.TestObjectColumnSelector;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
@ -620,15 +618,10 @@ public class ExpressionSelectorsTest extends InitializedNullHandlingTest
// underlying dimension selector.
// This occurred during schemaless ingestion with spare dimension values and no explicit null rows, so the
// conditions are replicated by this test. See https://github.com/apache/druid/pull/10248 for details
IncrementalIndexSchema schema = new IncrementalIndexSchema(
0,
new TimestampSpec("time", "millis", DateTimes.nowUtc()),
Granularities.NONE,
VirtualColumns.EMPTY,
DimensionsSpec.EMPTY,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
true
);
IncrementalIndexSchema schema = IncrementalIndexSchema.builder()
.withTimestampSpec(new TimestampSpec("time", "millis", DateTimes.nowUtc()))
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.build();
IncrementalIndex index = new OnheapIncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).build();
index.add(