mirror of https://github.com/apache/druid.git
Simplify information in IncrementalIndex
This commit is contained in:
parent
8d1686039a
commit
976ebc45c0
|
@ -337,11 +337,12 @@ public class IndexGeneratorJob implements Jobby
|
||||||
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context)
|
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context)
|
||||||
throws IOException, InterruptedException
|
throws IOException, InterruptedException
|
||||||
{
|
{
|
||||||
|
final List<String> dimensions = index.getDimensionNames();
|
||||||
Iterator<Row> rows = index.iterator();
|
Iterator<Row> rows = index.iterator();
|
||||||
while (rows.hasNext()) {
|
while (rows.hasNext()) {
|
||||||
context.progress();
|
context.progress();
|
||||||
Row row = rows.next();
|
Row row = rows.next();
|
||||||
InputRow inputRow = getInputRowFromRow(row, index.getDimensions());
|
InputRow inputRow = getInputRowFromRow(row, dimensions);
|
||||||
context.write(
|
context.write(
|
||||||
key,
|
key,
|
||||||
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))
|
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))
|
||||||
|
|
|
@ -23,7 +23,6 @@ import com.google.common.base.Function;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
@ -34,6 +33,7 @@ import com.metamx.common.ISE;
|
||||||
import io.druid.data.input.InputRow;
|
import io.druid.data.input.InputRow;
|
||||||
import io.druid.data.input.MapBasedRow;
|
import io.druid.data.input.MapBasedRow;
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
|
import io.druid.data.input.impl.DimensionsSpec;
|
||||||
import io.druid.data.input.impl.SpatialDimensionSchema;
|
import io.druid.data.input.impl.SpatialDimensionSchema;
|
||||||
import io.druid.granularity.QueryGranularity;
|
import io.druid.granularity.QueryGranularity;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
@ -62,7 +62,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentNavigableMap;
|
import java.util.concurrent.ConcurrentNavigableMap;
|
||||||
|
@ -261,22 +260,18 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
private final QueryGranularity gran;
|
private final QueryGranularity gran;
|
||||||
private final List<Function<InputRow, InputRow>> rowTransformers;
|
private final List<Function<InputRow, InputRow>> rowTransformers;
|
||||||
private final AggregatorFactory[] metrics;
|
private final AggregatorFactory[] metrics;
|
||||||
private final Map<String, Integer> metricIndexes;
|
|
||||||
private final Map<String, String> metricTypes;
|
|
||||||
private final ImmutableList<String> metricNames;
|
|
||||||
private final LinkedHashMap<String, Integer> dimensionOrder;
|
|
||||||
private final AggregatorType[] aggs;
|
private final AggregatorType[] aggs;
|
||||||
private final DimensionHolder dimValues;
|
|
||||||
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
|
|
||||||
private final boolean deserializeComplexMetrics;
|
private final boolean deserializeComplexMetrics;
|
||||||
|
|
||||||
protected final CopyOnWriteArrayList<String> dimensions;
|
private final Map<String, MetricDesc> metricDescs;
|
||||||
|
private final Map<String, DimensionDesc> dimensionDescs;
|
||||||
|
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
|
||||||
|
|
||||||
private volatile AtomicInteger numEntries = new AtomicInteger();
|
private final AtomicInteger numEntries = new AtomicInteger();
|
||||||
|
|
||||||
// This is modified on add() in a critical section.
|
// This is modified on add() in a critical section.
|
||||||
private ThreadLocal<InputRow> in = new ThreadLocal<>();
|
private final ThreadLocal<InputRow> in = new ThreadLocal<>();
|
||||||
private Supplier<InputRow> rowSupplier = new Supplier<InputRow>()
|
private final Supplier<InputRow> rowSupplier = new Supplier<InputRow>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public InputRow get()
|
public InputRow get()
|
||||||
|
@ -304,50 +299,33 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
this.rowTransformers = new CopyOnWriteArrayList<>();
|
this.rowTransformers = new CopyOnWriteArrayList<>();
|
||||||
this.deserializeComplexMetrics = deserializeComplexMetrics;
|
this.deserializeComplexMetrics = deserializeComplexMetrics;
|
||||||
|
|
||||||
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
|
|
||||||
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
|
|
||||||
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
|
|
||||||
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
|
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
|
||||||
|
|
||||||
for (int i = 0; i < metrics.length; i++) {
|
|
||||||
final String metricName = metrics[i].getName();
|
|
||||||
metricNamesBuilder.add(metricName);
|
|
||||||
metricIndexesBuilder.put(metricName, i);
|
|
||||||
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
|
|
||||||
}
|
|
||||||
metricNames = metricNamesBuilder.build();
|
|
||||||
metricIndexes = metricIndexesBuilder.build();
|
|
||||||
metricTypes = metricTypesBuilder.build();
|
|
||||||
|
|
||||||
this.dimensionOrder = Maps.newLinkedHashMap();
|
|
||||||
this.dimensions = new CopyOnWriteArrayList<>(incrementalIndexSchema.getDimensionsSpec().getDimensions());
|
|
||||||
// This should really be more generic
|
|
||||||
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
|
|
||||||
if (!spatialDimensions.isEmpty()) {
|
|
||||||
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
|
|
||||||
}
|
|
||||||
|
|
||||||
this.columnCapabilities = Maps.newHashMap();
|
this.columnCapabilities = Maps.newHashMap();
|
||||||
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
|
|
||||||
ValueType type;
|
this.metricDescs = Maps.newLinkedHashMap();
|
||||||
if (entry.getValue().equalsIgnoreCase("float")) {
|
for (AggregatorFactory metric : metrics) {
|
||||||
type = ValueType.FLOAT;
|
MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric);
|
||||||
} else if (entry.getValue().equalsIgnoreCase("long")) {
|
metricDescs.put(metricDesc.getName(), metricDesc);
|
||||||
type = ValueType.LONG;
|
columnCapabilities.put(metricDesc.getName(), metricDesc.getCapabilities());
|
||||||
} else {
|
|
||||||
type = ValueType.COMPLEX;
|
|
||||||
}
|
|
||||||
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
|
|
||||||
capabilities.setType(type);
|
|
||||||
columnCapabilities.put(entry.getKey(), capabilities);
|
|
||||||
}
|
}
|
||||||
this.dimValues = new DimensionHolder();
|
|
||||||
for (String dimension : dimensions) {
|
DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec();
|
||||||
|
|
||||||
|
this.dimensionDescs = Maps.newLinkedHashMap();
|
||||||
|
for (String dimension : dimensionsSpec.getDimensions()) {
|
||||||
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
|
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
|
||||||
capabilities.setType(ValueType.STRING);
|
capabilities.setType(ValueType.STRING);
|
||||||
|
dimensionDescs.put(
|
||||||
|
dimension,
|
||||||
|
new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities)
|
||||||
|
);
|
||||||
columnCapabilities.put(dimension, capabilities);
|
columnCapabilities.put(dimension, capabilities);
|
||||||
dimensionOrder.put(dimension, dimensionOrder.size());
|
}
|
||||||
dimValues.add(dimension);
|
|
||||||
|
// This should really be more generic
|
||||||
|
List<SpatialDimensionSchema> spatialDimensions = dimensionsSpec.getSpatialDimensions();
|
||||||
|
if (!spatialDimensions.isEmpty()) {
|
||||||
|
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
|
||||||
}
|
}
|
||||||
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
|
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
|
||||||
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
|
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
|
||||||
|
@ -357,12 +335,18 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private DimDim newDimDim(String dimension)
|
||||||
|
{
|
||||||
|
return new NullValueConverterDimDim(makeDimDim(dimension));
|
||||||
|
}
|
||||||
|
|
||||||
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
|
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
|
||||||
|
|
||||||
public abstract boolean canAppendRow();
|
public abstract boolean canAppendRow();
|
||||||
|
|
||||||
public abstract String getOutOfRowsReason();
|
public abstract String getOutOfRowsReason();
|
||||||
|
|
||||||
|
// use newDimDim
|
||||||
protected abstract DimDim makeDimDim(String dimension);
|
protected abstract DimDim makeDimDim(String dimension);
|
||||||
|
|
||||||
protected abstract AggregatorType[] initAggs(
|
protected abstract AggregatorType[] initAggs(
|
||||||
|
@ -432,32 +416,38 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
|
|
||||||
String[][] dims;
|
String[][] dims;
|
||||||
List<String[]> overflow = null;
|
List<String[]> overflow = null;
|
||||||
synchronized (dimensionOrder) {
|
synchronized (dimensionDescs) {
|
||||||
dims = new String[dimensionOrder.size()][];
|
dims = new String[dimensionDescs.size()][];
|
||||||
for (String dimension : rowDimensions) {
|
for (String dimension : rowDimensions) {
|
||||||
List<String> dimensionValues = row.getDimension(dimension);
|
List<String> dimensionValues = row.getDimension(dimension);
|
||||||
|
|
||||||
// Set column capabilities as data is coming in
|
ColumnCapabilitiesImpl capabilities;
|
||||||
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
|
DimensionDesc desc = dimensionDescs.get(dimension);
|
||||||
if (capabilities == null) {
|
if (desc != null) {
|
||||||
capabilities = new ColumnCapabilitiesImpl();
|
capabilities = desc.getCapabilities();
|
||||||
capabilities.setType(ValueType.STRING);
|
} else {
|
||||||
columnCapabilities.put(dimension, capabilities);
|
capabilities = columnCapabilities.get(dimension);
|
||||||
|
if (capabilities == null) {
|
||||||
|
capabilities = new ColumnCapabilitiesImpl();
|
||||||
|
capabilities.setType(ValueType.STRING);
|
||||||
|
columnCapabilities.put(dimension, capabilities);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (dimensionValues.size() > 1) {
|
|
||||||
|
// Set column capabilities as data is coming in
|
||||||
|
if (!capabilities.hasMultipleValues() && dimensionValues.size() > 1) {
|
||||||
capabilities.setHasMultipleValues(true);
|
capabilities.setHasMultipleValues(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
Integer index = dimensionOrder.get(dimension);
|
if (desc == null) {
|
||||||
if (index == null) {
|
desc = new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities);
|
||||||
dimensionOrder.put(dimension, dimensionOrder.size());
|
dimensionDescs.put(dimension, desc);
|
||||||
dimensions.add(dimension);
|
|
||||||
|
|
||||||
if (overflow == null) {
|
if (overflow == null) {
|
||||||
overflow = Lists.newArrayList();
|
overflow = Lists.newArrayList();
|
||||||
}
|
}
|
||||||
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
|
overflow.add(getDimVals(desc.getValues(), dimensionValues));
|
||||||
} else if (index > dims.length || dims[index] != null) {
|
} else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) {
|
||||||
/*
|
/*
|
||||||
* index > dims.length requires that we saw this dimension and added it to the dimensionOrder map,
|
* index > dims.length requires that we saw this dimension and added it to the dimensionOrder map,
|
||||||
* otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add,
|
* otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add,
|
||||||
|
@ -469,7 +459,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
*/
|
*/
|
||||||
throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension);
|
throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension);
|
||||||
} else {
|
} else {
|
||||||
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
|
dims[desc.getIndex()] = getDimVals(desc.getValues(), dimensionValues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -551,19 +541,30 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
return metrics;
|
return metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DimensionHolder getDimValues()
|
public List<String> getDimensionNames()
|
||||||
{
|
{
|
||||||
return dimValues;
|
synchronized (dimensionDescs) {
|
||||||
|
return ImmutableList.copyOf(dimensionDescs.keySet());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getDimensions()
|
public List<DimensionDesc> getDimensions()
|
||||||
{
|
{
|
||||||
return dimensions;
|
synchronized (dimensionDescs) {
|
||||||
|
return ImmutableList.copyOf(dimensionDescs.values());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public DimensionDesc getDimension(String dimension)
|
||||||
|
{
|
||||||
|
synchronized (dimensionDescs) {
|
||||||
|
return dimensionDescs.get(dimension);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getMetricType(String metric)
|
public String getMetricType(String metric)
|
||||||
{
|
{
|
||||||
return metricTypes.get(metric);
|
return metricDescs.get(metric).getType();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Interval getInterval()
|
public Interval getInterval()
|
||||||
|
@ -581,24 +582,32 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
|
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
public DimDim getDimension(String dimension)
|
public DimDim getDimensionValues(String dimension)
|
||||||
{
|
{
|
||||||
return isEmpty() ? null : dimValues.get(dimension);
|
DimensionDesc dimSpec = getDimension(dimension);
|
||||||
|
return dimSpec == null ? null : dimSpec.getValues();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getDimensionIndex(String dimension)
|
public Integer getDimensionIndex(String dimension)
|
||||||
{
|
{
|
||||||
return dimensionOrder.get(dimension);
|
DimensionDesc dimSpec = getDimension(dimension);
|
||||||
|
return dimSpec == null ? null : dimSpec.getIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getMetricNames()
|
public List<String> getMetricNames()
|
||||||
{
|
{
|
||||||
return metricNames;
|
return ImmutableList.copyOf(metricDescs.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MetricDesc> getMetrics()
|
||||||
|
{
|
||||||
|
return ImmutableList.copyOf(metricDescs.values());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getMetricIndex(String metricName)
|
public Integer getMetricIndex(String metricName)
|
||||||
{
|
{
|
||||||
return metricIndexes.get(metricName);
|
MetricDesc metSpec = metricDescs.get(metricName);
|
||||||
|
return metSpec == null ? null : metSpec.getIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ColumnCapabilities getCapabilities(String column)
|
public ColumnCapabilities getCapabilities(String column)
|
||||||
|
@ -619,6 +628,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
|
|
||||||
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
|
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
|
||||||
{
|
{
|
||||||
|
final List<String> dimensions = getDimensionNames();
|
||||||
return new Iterable<Row>()
|
return new Iterable<Row>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
|
@ -641,8 +651,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
String[] dim = theDims[i];
|
String[] dim = theDims[i];
|
||||||
if (dim != null && dim.length != 0) {
|
if (dim != null && dim.length != 0) {
|
||||||
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
|
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
theVals.put(dimensions.get(i), null);
|
theVals.put(dimensions.get(i), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -671,30 +680,82 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
return maxIngestedEventTime;
|
return maxIngestedEventTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
class DimensionHolder
|
public static class DimensionDesc
|
||||||
{
|
{
|
||||||
private final Map<String, DimDim> dimensions;
|
private final int index;
|
||||||
|
private final String name;
|
||||||
|
private final DimDim values;
|
||||||
|
private final ColumnCapabilitiesImpl capabilities;
|
||||||
|
|
||||||
DimensionHolder()
|
public DimensionDesc(int index, String name, DimDim values, ColumnCapabilitiesImpl capabilities)
|
||||||
{
|
{
|
||||||
dimensions = Maps.newConcurrentMap();
|
this.index = index;
|
||||||
|
this.name = name;
|
||||||
|
this.values = values;
|
||||||
|
this.capabilities = capabilities;
|
||||||
}
|
}
|
||||||
|
|
||||||
DimDim add(String dimension)
|
public int getIndex()
|
||||||
{
|
{
|
||||||
DimDim holder = dimensions.get(dimension);
|
return index;
|
||||||
if (holder == null) {
|
}
|
||||||
holder = new NullValueConverterDimDim(makeDimDim(dimension));
|
|
||||||
dimensions.put(dimension, holder);
|
public String getName()
|
||||||
|
{
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DimDim getValues()
|
||||||
|
{
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ColumnCapabilitiesImpl getCapabilities()
|
||||||
|
{
|
||||||
|
return capabilities;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MetricDesc
|
||||||
|
{
|
||||||
|
private final int index;
|
||||||
|
private final String name;
|
||||||
|
private final String type;
|
||||||
|
private final ColumnCapabilitiesImpl capabilities;
|
||||||
|
|
||||||
|
public MetricDesc(int index, AggregatorFactory factory)
|
||||||
|
{
|
||||||
|
this.index = index;
|
||||||
|
this.name = factory.getName();
|
||||||
|
this.type = factory.getTypeName();
|
||||||
|
this.capabilities = new ColumnCapabilitiesImpl();
|
||||||
|
if (type.equalsIgnoreCase("float")) {
|
||||||
|
capabilities.setType(ValueType.FLOAT);
|
||||||
|
} else if (type.equalsIgnoreCase("long")) {
|
||||||
|
capabilities.setType(ValueType.LONG);
|
||||||
} else {
|
} else {
|
||||||
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
|
capabilities.setType(ValueType.COMPLEX);
|
||||||
}
|
}
|
||||||
return holder;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DimDim get(String dimension)
|
public int getIndex()
|
||||||
{
|
{
|
||||||
return dimensions.get(dimension);
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName()
|
||||||
|
{
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType()
|
||||||
|
{
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ColumnCapabilitiesImpl getCapabilities()
|
||||||
|
{
|
||||||
|
return capabilities;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -718,7 +779,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
|
|
||||||
public void sort();
|
public void sort();
|
||||||
|
|
||||||
public boolean compareCannonicalValues(String s1, String s2);
|
public boolean compareCanonicalValues(String s1, String s2);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -788,9 +849,9 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean compareCannonicalValues(String s1, String s2)
|
public boolean compareCanonicalValues(String s1, String s2)
|
||||||
{
|
{
|
||||||
return delegate.compareCannonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2));
|
return delegate.compareCanonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,7 @@ import javax.annotation.Nullable;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -66,17 +67,19 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
||||||
|
|
||||||
this.invertedIndexes = Maps.newHashMap();
|
this.invertedIndexes = Maps.newHashMap();
|
||||||
|
|
||||||
for (String dimension : index.getDimensions()) {
|
final List<IncrementalIndex.DimensionDesc> dimensions = index.getDimensions();
|
||||||
invertedIndexes.put(dimension, Maps.<String, MutableBitmap>newHashMap());
|
|
||||||
|
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
|
||||||
|
invertedIndexes.put(dimension.getName(), Maps.<String, MutableBitmap>newHashMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
int rowNum = 0;
|
int rowNum = 0;
|
||||||
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
|
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
|
||||||
final String[][] dims = timeAndDims.getDims();
|
final String[][] dims = timeAndDims.getDims();
|
||||||
|
|
||||||
for (String dimension : index.getDimensions()) {
|
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
|
||||||
int dimIndex = index.getDimensionIndex(dimension);
|
final int dimIndex = dimension.getIndex();
|
||||||
Map<String, MutableBitmap> bitmapIndexes = invertedIndexes.get(dimension);
|
final Map<String, MutableBitmap> bitmapIndexes = invertedIndexes.get(dimension.getName());
|
||||||
|
|
||||||
if (bitmapIndexes == null || dims == null) {
|
if (bitmapIndexes == null || dims == null) {
|
||||||
log.error("bitmapIndexes and dims are null!");
|
log.error("bitmapIndexes and dims are null!");
|
||||||
|
@ -122,7 +125,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
||||||
@Override
|
@Override
|
||||||
public Indexed<String> getDimensionNames()
|
public Indexed<String> getDimensionNames()
|
||||||
{
|
{
|
||||||
return new ListIndexed<String>(index.getDimensions(), String.class);
|
return new ListIndexed<String>(index.getDimensionNames(), String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -134,7 +137,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
||||||
@Override
|
@Override
|
||||||
public Indexed<String> getDimValueLookup(String dimension)
|
public Indexed<String> getDimValueLookup(String dimension)
|
||||||
{
|
{
|
||||||
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
|
final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
|
||||||
dimDim.sort();
|
dimDim.sort();
|
||||||
|
|
||||||
return new Indexed<String>()
|
return new Indexed<String>()
|
||||||
|
@ -179,6 +182,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
||||||
@Override
|
@Override
|
||||||
public Iterator<Rowboat> iterator()
|
public Iterator<Rowboat> iterator()
|
||||||
{
|
{
|
||||||
|
final List<IncrementalIndex.DimensionDesc> dimensions = index.getDimensions();
|
||||||
/*
|
/*
|
||||||
* Note that the transform function increments a counter to determine the rowNum of
|
* Note that the transform function increments a counter to determine the rowNum of
|
||||||
* the iterated Rowboats. We need to return a new iterator on each
|
* the iterated Rowboats. We need to return a new iterator on each
|
||||||
|
@ -200,9 +204,9 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
||||||
final int rowOffset = input.getValue();
|
final int rowOffset = input.getValue();
|
||||||
|
|
||||||
int[][] dims = new int[dimValues.length][];
|
int[][] dims = new int[dimValues.length][];
|
||||||
for (String dimension : index.getDimensions()) {
|
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
|
||||||
int dimIndex = index.getDimensionIndex(dimension);
|
final int dimIndex = dimension.getIndex();
|
||||||
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
|
final IncrementalIndex.DimDim dimDim = dimension.getValues();
|
||||||
dimDim.sort();
|
dimDim.sort();
|
||||||
|
|
||||||
if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) {
|
if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) {
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
@Override
|
@Override
|
||||||
public Indexed<String> getAvailableDimensions()
|
public Indexed<String> getAvailableDimensions()
|
||||||
{
|
{
|
||||||
return new ListIndexed<String>(index.getDimensions(), String.class);
|
return new ListIndexed<String>(index.getDimensionNames(), String.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -109,7 +109,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
|
IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
|
||||||
if (dimDim == null) {
|
if (dimDim == null) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -312,7 +312,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
|
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
|
||||||
}
|
}
|
||||||
|
|
||||||
final IncrementalIndex.DimDim dimValLookup = index.getDimension(dimension);
|
final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension);
|
||||||
if (dimValLookup == null) {
|
if (dimValLookup == null) {
|
||||||
return NULL_DIMENSION_SELECTOR;
|
return NULL_DIMENSION_SELECTOR;
|
||||||
}
|
}
|
||||||
|
@ -611,7 +611,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
if (dimIndexObject == null) {
|
if (dimIndexObject == null) {
|
||||||
return new BooleanValueMatcher(Strings.isNullOrEmpty(value));
|
return new BooleanValueMatcher(Strings.isNullOrEmpty(value));
|
||||||
}
|
}
|
||||||
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
|
final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
|
||||||
if (!dimDim.contains(value)) {
|
if (!dimDim.contains(value)) {
|
||||||
if (Strings.isNullOrEmpty(value)) {
|
if (Strings.isNullOrEmpty(value)) {
|
||||||
final int dimIndex = dimIndexObject;
|
final int dimIndex = dimIndexObject;
|
||||||
|
@ -646,7 +646,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String dimVal : dims[dimIndex]) {
|
for (String dimVal : dims[dimIndex]) {
|
||||||
if (dimDim.compareCannonicalValues(id, dimVal)) {
|
if (dimDim.compareCanonicalValues(id, dimVal)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -293,7 +293,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
|
||||||
|
|
||||||
private DimDim getDimDim(int dimIndex)
|
private DimDim getDimDim(int dimIndex)
|
||||||
{
|
{
|
||||||
return getDimValues().get(getDimensions().get(dimIndex));
|
return getDimensionValues(getDimensionNames().get(dimIndex));
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapDB forces serializers to implement serializable, which sucks
|
// MapDB forces serializers to implement serializable, which sucks
|
||||||
|
@ -470,7 +470,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean compareCannonicalValues(String s1, String s2)
|
public boolean compareCanonicalValues(String s1, String s2)
|
||||||
{
|
{
|
||||||
return s1.equals(s2);
|
return s1.equals(s2);
|
||||||
}
|
}
|
||||||
|
|
|
@ -340,7 +340,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean compareCannonicalValues(String s1, String s2)
|
public boolean compareCanonicalValues(String s1, String s2)
|
||||||
{
|
{
|
||||||
return s1 == s2;
|
return s1 == s2;
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,7 +194,7 @@ public class IncrementalIndexTest
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
|
IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
|
||||||
populateIndex(timestamp, index);
|
populateIndex(timestamp, index);
|
||||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensions());
|
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
|
||||||
Assert.assertEquals(2, index.size());
|
Assert.assertEquals(2, index.size());
|
||||||
|
|
||||||
final Iterator<Row> rows = index.iterator();
|
final Iterator<Row> rows = index.iterator();
|
||||||
|
@ -456,7 +456,7 @@ public class IncrementalIndexTest
|
||||||
}
|
}
|
||||||
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
|
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
|
||||||
|
|
||||||
Assert.assertEquals(dimensionCount, index.getDimensions().size());
|
Assert.assertEquals(dimensionCount, index.getDimensionNames().size());
|
||||||
Assert.assertEquals(elementsPerThread, index.size());
|
Assert.assertEquals(elementsPerThread, index.size());
|
||||||
Iterator<Row> iterator = index.iterator();
|
Iterator<Row> iterator = index.iterator();
|
||||||
int curr = 0;
|
int curr = 0;
|
||||||
|
@ -492,6 +492,6 @@ public class IncrementalIndexTest
|
||||||
true,
|
true,
|
||||||
1000000
|
1000000
|
||||||
);
|
);
|
||||||
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensions());
|
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue