Merge pull request #2169 from navis/refactor-incremental-index3

Simplify information in IncrementalIndex
This commit is contained in:
Jonathan Wei 2016-01-11 18:17:39 -08:00
commit 419fa5adaf
7 changed files with 182 additions and 116 deletions

View File

@ -337,11 +337,12 @@ public class IndexGeneratorJob implements Jobby
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context)
throws IOException, InterruptedException
{
final List<String> dimensions = index.getDimensionNames();
Iterator<Row> rows = index.iterator();
while (rows.hasNext()) {
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, index.getDimensions());
InputRow inputRow = getInputRowFromRow(row, dimensions);
context.write(
key,
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))

View File

@ -23,7 +23,6 @@ import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -34,6 +33,7 @@ import com.metamx.common.ISE;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
@ -62,7 +62,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
@ -261,22 +260,18 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private final QueryGranularity gran;
private final List<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
private final ImmutableList<String> metricNames;
private final LinkedHashMap<String, Integer> dimensionOrder;
private final AggregatorType[] aggs;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final boolean deserializeComplexMetrics;
protected final CopyOnWriteArrayList<String> dimensions;
private final Map<String, MetricDesc> metricDescs;
private final Map<String, DimensionDesc> dimensionDescs;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private volatile AtomicInteger numEntries = new AtomicInteger();
private final AtomicInteger numEntries = new AtomicInteger();
// This is modified on add() in a critical section.
private ThreadLocal<InputRow> in = new ThreadLocal<>();
private Supplier<InputRow> rowSupplier = new Supplier<InputRow>()
private final ThreadLocal<InputRow> in = new ThreadLocal<>();
private final Supplier<InputRow> rowSupplier = new Supplier<InputRow>()
{
@Override
public InputRow get()
@ -304,50 +299,33 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<String, String> metricTypesBuilder = ImmutableMap.builder();
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
for (int i = 0; i < metrics.length; i++) {
final String metricName = metrics[i].getName();
metricNamesBuilder.add(metricName);
metricIndexesBuilder.put(metricName, i);
metricTypesBuilder.put(metricName, metrics[i].getTypeName());
}
metricNames = metricNamesBuilder.build();
metricIndexes = metricIndexesBuilder.build();
metricTypes = metricTypesBuilder.build();
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<>(incrementalIndexSchema.getDimensionsSpec().getDimensions());
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else if (entry.getValue().equalsIgnoreCase("long")) {
type = ValueType.LONG;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
this.metricDescs = Maps.newLinkedHashMap();
for (AggregatorFactory metric : metrics) {
MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric);
metricDescs.put(metricDesc.getName(), metricDesc);
columnCapabilities.put(metricDesc.getName(), metricDesc.getCapabilities());
}
this.dimValues = new DimensionHolder();
for (String dimension : dimensions) {
DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec();
this.dimensionDescs = Maps.newLinkedHashMap();
for (String dimension : dimensionsSpec.getDimensions()) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
dimensionDescs.put(
dimension,
new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities)
);
columnCapabilities.put(dimension, capabilities);
dimensionOrder.put(dimension, dimensionOrder.size());
dimValues.add(dimension);
}
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = dimensionsSpec.getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
@ -357,12 +335,18 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
}
private DimDim newDimDim(String dimension)
{
return new NullValueConverterDimDim(makeDimDim(dimension));
}
public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public abstract boolean canAppendRow();
public abstract String getOutOfRowsReason();
// use newDimDim
protected abstract DimDim makeDimDim(String dimension);
protected abstract AggregatorType[] initAggs(
@ -432,32 +416,38 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
String[][] dims;
List<String[]> overflow = null;
synchronized (dimensionOrder) {
dims = new String[dimensionOrder.size()][];
synchronized (dimensionDescs) {
dims = new String[dimensionDescs.size()][];
for (String dimension : rowDimensions) {
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
ColumnCapabilitiesImpl capabilities;
DimensionDesc desc = dimensionDescs.get(dimension);
if (desc != null) {
capabilities = desc.getCapabilities();
} else {
capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
}
if (dimensionValues.size() > 1) {
// Set column capabilities as data is coming in
if (!capabilities.hasMultipleValues() && dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
dimensions.add(dimension);
if (desc == null) {
desc = new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities);
dimensionDescs.put(dimension, desc);
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else if (index > dims.length || dims[index] != null) {
overflow.add(getDimVals(desc.getValues(), dimensionValues));
} else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) {
/*
* index > dims.length requires that we saw this dimension and added it to the dimensionOrder map,
* otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add,
@ -469,7 +459,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
*/
throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension);
} else {
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
dims[desc.getIndex()] = getDimVals(desc.getValues(), dimensionValues);
}
}
}
@ -551,19 +541,30 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return metrics;
}
public DimensionHolder getDimValues()
public List<String> getDimensionNames()
{
return dimValues;
synchronized (dimensionDescs) {
return ImmutableList.copyOf(dimensionDescs.keySet());
}
}
public List<String> getDimensions()
public List<DimensionDesc> getDimensions()
{
return dimensions;
synchronized (dimensionDescs) {
return ImmutableList.copyOf(dimensionDescs.values());
}
}
public DimensionDesc getDimension(String dimension)
{
synchronized (dimensionDescs) {
return dimensionDescs.get(dimension);
}
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
return metricDescs.get(metric).getType();
}
public Interval getInterval()
@ -581,24 +582,32 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return isEmpty() ? null : new DateTime(getMaxTimeMillis());
}
public DimDim getDimension(String dimension)
public DimDim getDimensionValues(String dimension)
{
return isEmpty() ? null : dimValues.get(dimension);
DimensionDesc dimSpec = getDimension(dimension);
return dimSpec == null ? null : dimSpec.getValues();
}
public Integer getDimensionIndex(String dimension)
{
return dimensionOrder.get(dimension);
DimensionDesc dimSpec = getDimension(dimension);
return dimSpec == null ? null : dimSpec.getIndex();
}
public List<String> getMetricNames()
{
return metricNames;
return ImmutableList.copyOf(metricDescs.keySet());
}
public List<MetricDesc> getMetrics()
{
return ImmutableList.copyOf(metricDescs.values());
}
public Integer getMetricIndex(String metricName)
{
return metricIndexes.get(metricName);
MetricDesc metSpec = metricDescs.get(metricName);
return metSpec == null ? null : metSpec.getIndex();
}
public ColumnCapabilities getCapabilities(String column)
@ -619,6 +628,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> postAggs)
{
final List<String> dimensions = getDimensionNames();
return new Iterable<Row>()
{
@Override
@ -641,8 +651,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
String[] dim = theDims[i];
if (dim != null && dim.length != 0) {
theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim));
}
else {
} else {
theVals.put(dimensions.get(i), null);
}
}
@ -671,30 +680,82 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return maxIngestedEventTime;
}
class DimensionHolder
public static class DimensionDesc
{
private final Map<String, DimDim> dimensions;
private final int index;
private final String name;
private final DimDim values;
private final ColumnCapabilitiesImpl capabilities;
DimensionHolder()
public DimensionDesc(int index, String name, DimDim values, ColumnCapabilitiesImpl capabilities)
{
dimensions = Maps.newConcurrentMap();
this.index = index;
this.name = name;
this.values = values;
this.capabilities = capabilities;
}
DimDim add(String dimension)
public int getIndex()
{
DimDim holder = dimensions.get(dimension);
if (holder == null) {
holder = new NullValueConverterDimDim(makeDimDim(dimension));
dimensions.put(dimension, holder);
return index;
}
public String getName()
{
return name;
}
public DimDim getValues()
{
return values;
}
public ColumnCapabilitiesImpl getCapabilities()
{
return capabilities;
}
}
public static class MetricDesc
{
private final int index;
private final String name;
private final String type;
private final ColumnCapabilitiesImpl capabilities;
public MetricDesc(int index, AggregatorFactory factory)
{
this.index = index;
this.name = factory.getName();
this.type = factory.getTypeName();
this.capabilities = new ColumnCapabilitiesImpl();
if (type.equalsIgnoreCase("float")) {
capabilities.setType(ValueType.FLOAT);
} else if (type.equalsIgnoreCase("long")) {
capabilities.setType(ValueType.LONG);
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
capabilities.setType(ValueType.COMPLEX);
}
return holder;
}
DimDim get(String dimension)
public int getIndex()
{
return dimensions.get(dimension);
return index;
}
public String getName()
{
return name;
}
public String getType()
{
return type;
}
public ColumnCapabilitiesImpl getCapabilities()
{
return capabilities;
}
}
@ -718,7 +779,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public void sort();
public boolean compareCannonicalValues(String s1, String s2);
public boolean compareCanonicalValues(String s1, String s2);
}
/**
@ -788,9 +849,9 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
@Override
public boolean compareCannonicalValues(String s1, String s2)
public boolean compareCanonicalValues(String s1, String s2)
{
return delegate.compareCannonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2));
return delegate.compareCanonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2));
}
}

View File

@ -46,6 +46,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
@ -66,17 +67,19 @@ public class IncrementalIndexAdapter implements IndexableAdapter
this.invertedIndexes = Maps.newHashMap();
for (String dimension : index.getDimensions()) {
invertedIndexes.put(dimension, Maps.<String, MutableBitmap>newHashMap());
final List<IncrementalIndex.DimensionDesc> dimensions = index.getDimensions();
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
invertedIndexes.put(dimension.getName(), Maps.<String, MutableBitmap>newHashMap());
}
int rowNum = 0;
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
final String[][] dims = timeAndDims.getDims();
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
Map<String, MutableBitmap> bitmapIndexes = invertedIndexes.get(dimension);
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
final int dimIndex = dimension.getIndex();
final Map<String, MutableBitmap> bitmapIndexes = invertedIndexes.get(dimension.getName());
if (bitmapIndexes == null || dims == null) {
log.error("bitmapIndexes and dims are null!");
@ -122,7 +125,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public Indexed<String> getDimensionNames()
{
return new ListIndexed<String>(index.getDimensions(), String.class);
return new ListIndexed<String>(index.getDimensionNames(), String.class);
}
@Override
@ -134,7 +137,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
dimDim.sort();
return new Indexed<String>()
@ -179,6 +182,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override
public Iterator<Rowboat> iterator()
{
final List<IncrementalIndex.DimensionDesc> dimensions = index.getDimensions();
/*
* Note that the transform function increments a counter to determine the rowNum of
* the iterated Rowboats. We need to return a new iterator on each
@ -200,9 +204,9 @@ public class IncrementalIndexAdapter implements IndexableAdapter
final int rowOffset = input.getValue();
int[][] dims = new int[dimValues.length][];
for (String dimension : index.getDimensions()) {
int dimIndex = index.getDimensionIndex(dimension);
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
final int dimIndex = dimension.getIndex();
final IncrementalIndex.DimDim dimDim = dimension.getValues();
dimDim.sort();
if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) {

View File

@ -94,7 +94,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public Indexed<String> getAvailableDimensions()
{
return new ListIndexed<String>(index.getDimensions(), String.class);
return new ListIndexed<String>(index.getDimensionNames(), String.class);
}
@Override
@ -109,7 +109,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return Integer.MAX_VALUE;
}
IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
if (dimDim == null) {
return 0;
}
@ -312,7 +312,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
}
final IncrementalIndex.DimDim dimValLookup = index.getDimension(dimension);
final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension);
if (dimValLookup == null) {
return NULL_DIMENSION_SELECTOR;
}
@ -611,7 +611,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (dimIndexObject == null) {
return new BooleanValueMatcher(Strings.isNullOrEmpty(value));
}
final IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension);
if (!dimDim.contains(value)) {
if (Strings.isNullOrEmpty(value)) {
final int dimIndex = dimIndexObject;
@ -646,7 +646,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
for (String dimVal : dims[dimIndex]) {
if (dimDim.compareCannonicalValues(id, dimVal)) {
if (dimDim.compareCanonicalValues(id, dimVal)) {
return true;
}
}

View File

@ -293,7 +293,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private DimDim getDimDim(int dimIndex)
{
return getDimValues().get(getDimensions().get(dimIndex));
return getDimensionValues(getDimensionNames().get(dimIndex));
}
// MapDB forces serializers to implement serializable, which sucks
@ -470,7 +470,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
public boolean compareCannonicalValues(String s1, String s2)
public boolean compareCanonicalValues(String s1, String s2)
{
return s1.equals(s2);
}

View File

@ -340,7 +340,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
public boolean compareCannonicalValues(String s1, String s2)
public boolean compareCanonicalValues(String s1, String s2)
{
return s1 == s2;
}

View File

@ -194,7 +194,7 @@ public class IncrementalIndexTest
long timestamp = System.currentTimeMillis();
IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories);
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensions());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
Assert.assertEquals(2, index.size());
final Iterator<Row> rows = index.iterator();
@ -456,7 +456,7 @@ public class IncrementalIndexTest
}
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
Assert.assertEquals(dimensionCount, index.getDimensions().size());
Assert.assertEquals(dimensionCount, index.getDimensionNames().size());
Assert.assertEquals(elementsPerThread, index.size());
Iterator<Row> iterator = index.iterator();
int curr = 0;
@ -492,6 +492,6 @@ public class IncrementalIndexTest
true,
1000000
);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensions());
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
}
}