From 976ebc45c08f4a2bdbe6f6752c04b391f136ba11 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 29 Dec 2015 13:39:37 +0900 Subject: [PATCH] Simplify information in IncrementalIndex --- .../io/druid/indexer/IndexGeneratorJob.java | 3 +- .../segment/incremental/IncrementalIndex.java | 249 +++++++++++------- .../incremental/IncrementalIndexAdapter.java | 24 +- .../IncrementalIndexStorageAdapter.java | 10 +- .../incremental/OffheapIncrementalIndex.java | 4 +- .../incremental/OnheapIncrementalIndex.java | 2 +- .../segment/data/IncrementalIndexTest.java | 6 +- 7 files changed, 182 insertions(+), 116 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 980f981396e..28b4abb83e2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -337,11 +337,12 @@ public class IndexGeneratorJob implements Jobby private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException { + final List dimensions = index.getDimensionNames(); Iterator rows = index.iterator(); while (rows.hasNext()) { context.progress(); Row row = rows.next(); - InputRow inputRow = getInputRowFromRow(row, index.getDimensions()); + InputRow inputRow = getInputRowFromRow(row, dimensions); context.write( key, new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs)) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index cf7ff364c8c..53ae2753ce6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -34,6 +33,7 @@ import com.metamx.common.ISE; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; @@ -62,7 +62,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; @@ -261,22 +260,18 @@ public abstract class IncrementalIndex implements Iterable, private final QueryGranularity gran; private final List> rowTransformers; private final AggregatorFactory[] metrics; - private final Map metricIndexes; - private final Map metricTypes; - private final ImmutableList metricNames; - private final LinkedHashMap dimensionOrder; private final AggregatorType[] aggs; - private final DimensionHolder dimValues; - private final Map columnCapabilities; private final boolean deserializeComplexMetrics; - protected final CopyOnWriteArrayList dimensions; + private final Map metricDescs; + private final Map dimensionDescs; + private final Map columnCapabilities; - private volatile AtomicInteger numEntries = new AtomicInteger(); + private final AtomicInteger numEntries = new AtomicInteger(); // This is modified on add() in a critical section. - private ThreadLocal in = new ThreadLocal<>(); - private Supplier rowSupplier = new Supplier() + private final ThreadLocal in = new ThreadLocal<>(); + private final Supplier rowSupplier = new Supplier() { @Override public InputRow get() @@ -304,50 +299,33 @@ public abstract class IncrementalIndex implements Iterable, this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; - final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); - final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); - final ImmutableMap.Builder metricTypesBuilder = ImmutableMap.builder(); this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); - - for (int i = 0; i < metrics.length; i++) { - final String metricName = metrics[i].getName(); - metricNamesBuilder.add(metricName); - metricIndexesBuilder.put(metricName, i); - metricTypesBuilder.put(metricName, metrics[i].getTypeName()); - } - metricNames = metricNamesBuilder.build(); - metricIndexes = metricIndexesBuilder.build(); - metricTypes = metricTypesBuilder.build(); - - this.dimensionOrder = Maps.newLinkedHashMap(); - this.dimensions = new CopyOnWriteArrayList<>(incrementalIndexSchema.getDimensionsSpec().getDimensions()); - // This should really be more generic - List spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions(); - if (!spatialDimensions.isEmpty()) { - this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); - } - this.columnCapabilities = Maps.newHashMap(); - for (Map.Entry entry : metricTypes.entrySet()) { - ValueType type; - if (entry.getValue().equalsIgnoreCase("float")) { - type = ValueType.FLOAT; - } else if (entry.getValue().equalsIgnoreCase("long")) { - type = ValueType.LONG; - } else { - type = ValueType.COMPLEX; - } - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(type); - columnCapabilities.put(entry.getKey(), capabilities); + + this.metricDescs = Maps.newLinkedHashMap(); + for (AggregatorFactory metric : metrics) { + MetricDesc metricDesc = new MetricDesc(metricDescs.size(), metric); + metricDescs.put(metricDesc.getName(), metricDesc); + columnCapabilities.put(metricDesc.getName(), metricDesc.getCapabilities()); } - this.dimValues = new DimensionHolder(); - for (String dimension : dimensions) { + + DimensionsSpec dimensionsSpec = incrementalIndexSchema.getDimensionsSpec(); + + this.dimensionDescs = Maps.newLinkedHashMap(); + for (String dimension : dimensionsSpec.getDimensions()) { ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); capabilities.setType(ValueType.STRING); + dimensionDescs.put( + dimension, + new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities) + ); columnCapabilities.put(dimension, capabilities); - dimensionOrder.put(dimension, dimensionOrder.size()); - dimValues.add(dimension); + } + + // This should really be more generic + List spatialDimensions = dimensionsSpec.getSpatialDimensions(); + if (!spatialDimensions.isEmpty()) { + this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions)); } for (SpatialDimensionSchema spatialDimension : spatialDimensions) { ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); @@ -357,12 +335,18 @@ public abstract class IncrementalIndex implements Iterable, } } + private DimDim newDimDim(String dimension) + { + return new NullValueConverterDimDim(makeDimDim(dimension)); + } + public abstract ConcurrentNavigableMap getFacts(); public abstract boolean canAppendRow(); public abstract String getOutOfRowsReason(); + // use newDimDim protected abstract DimDim makeDimDim(String dimension); protected abstract AggregatorType[] initAggs( @@ -432,32 +416,38 @@ public abstract class IncrementalIndex implements Iterable, String[][] dims; List overflow = null; - synchronized (dimensionOrder) { - dims = new String[dimensionOrder.size()][]; + synchronized (dimensionDescs) { + dims = new String[dimensionDescs.size()][]; for (String dimension : rowDimensions) { List dimensionValues = row.getDimension(dimension); - // Set column capabilities as data is coming in - ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); - if (capabilities == null) { - capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dimension, capabilities); + ColumnCapabilitiesImpl capabilities; + DimensionDesc desc = dimensionDescs.get(dimension); + if (desc != null) { + capabilities = desc.getCapabilities(); + } else { + capabilities = columnCapabilities.get(dimension); + if (capabilities == null) { + capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dimension, capabilities); + } } - if (dimensionValues.size() > 1) { + + // Set column capabilities as data is coming in + if (!capabilities.hasMultipleValues() && dimensionValues.size() > 1) { capabilities.setHasMultipleValues(true); } - Integer index = dimensionOrder.get(dimension); - if (index == null) { - dimensionOrder.put(dimension, dimensionOrder.size()); - dimensions.add(dimension); + if (desc == null) { + desc = new DimensionDesc(dimensionDescs.size(), dimension, newDimDim(dimension), capabilities); + dimensionDescs.put(dimension, desc); if (overflow == null) { overflow = Lists.newArrayList(); } - overflow.add(getDimVals(dimValues.add(dimension), dimensionValues)); - } else if (index > dims.length || dims[index] != null) { + overflow.add(getDimVals(desc.getValues(), dimensionValues)); + } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) { /* * index > dims.length requires that we saw this dimension and added it to the dimensionOrder map, * otherwise index is null. Since dims is initialized based on the size of dimensionOrder on each call to add, @@ -469,7 +459,7 @@ public abstract class IncrementalIndex implements Iterable, */ throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); } else { - dims[index] = getDimVals(dimValues.get(dimension), dimensionValues); + dims[desc.getIndex()] = getDimVals(desc.getValues(), dimensionValues); } } } @@ -551,19 +541,30 @@ public abstract class IncrementalIndex implements Iterable, return metrics; } - public DimensionHolder getDimValues() + public List getDimensionNames() { - return dimValues; + synchronized (dimensionDescs) { + return ImmutableList.copyOf(dimensionDescs.keySet()); + } } - public List getDimensions() + public List getDimensions() { - return dimensions; + synchronized (dimensionDescs) { + return ImmutableList.copyOf(dimensionDescs.values()); + } + } + + public DimensionDesc getDimension(String dimension) + { + synchronized (dimensionDescs) { + return dimensionDescs.get(dimension); + } } public String getMetricType(String metric) { - return metricTypes.get(metric); + return metricDescs.get(metric).getType(); } public Interval getInterval() @@ -581,24 +582,32 @@ public abstract class IncrementalIndex implements Iterable, return isEmpty() ? null : new DateTime(getMaxTimeMillis()); } - public DimDim getDimension(String dimension) + public DimDim getDimensionValues(String dimension) { - return isEmpty() ? null : dimValues.get(dimension); + DimensionDesc dimSpec = getDimension(dimension); + return dimSpec == null ? null : dimSpec.getValues(); } public Integer getDimensionIndex(String dimension) { - return dimensionOrder.get(dimension); + DimensionDesc dimSpec = getDimension(dimension); + return dimSpec == null ? null : dimSpec.getIndex(); } public List getMetricNames() { - return metricNames; + return ImmutableList.copyOf(metricDescs.keySet()); + } + + public List getMetrics() + { + return ImmutableList.copyOf(metricDescs.values()); } public Integer getMetricIndex(String metricName) { - return metricIndexes.get(metricName); + MetricDesc metSpec = metricDescs.get(metricName); + return metSpec == null ? null : metSpec.getIndex(); } public ColumnCapabilities getCapabilities(String column) @@ -619,6 +628,7 @@ public abstract class IncrementalIndex implements Iterable, public Iterable iterableWithPostAggregations(final List postAggs) { + final List dimensions = getDimensionNames(); return new Iterable() { @Override @@ -641,8 +651,7 @@ public abstract class IncrementalIndex implements Iterable, String[] dim = theDims[i]; if (dim != null && dim.length != 0) { theVals.put(dimensions.get(i), dim.length == 1 ? dim[0] : Arrays.asList(dim)); - } - else { + } else { theVals.put(dimensions.get(i), null); } } @@ -671,30 +680,82 @@ public abstract class IncrementalIndex implements Iterable, return maxIngestedEventTime; } - class DimensionHolder + public static class DimensionDesc { - private final Map dimensions; + private final int index; + private final String name; + private final DimDim values; + private final ColumnCapabilitiesImpl capabilities; - DimensionHolder() + public DimensionDesc(int index, String name, DimDim values, ColumnCapabilitiesImpl capabilities) { - dimensions = Maps.newConcurrentMap(); + this.index = index; + this.name = name; + this.values = values; + this.capabilities = capabilities; } - DimDim add(String dimension) + public int getIndex() { - DimDim holder = dimensions.get(dimension); - if (holder == null) { - holder = new NullValueConverterDimDim(makeDimDim(dimension)); - dimensions.put(dimension, holder); + return index; + } + + public String getName() + { + return name; + } + + public DimDim getValues() + { + return values; + } + + public ColumnCapabilitiesImpl getCapabilities() + { + return capabilities; + } + } + + public static class MetricDesc + { + private final int index; + private final String name; + private final String type; + private final ColumnCapabilitiesImpl capabilities; + + public MetricDesc(int index, AggregatorFactory factory) + { + this.index = index; + this.name = factory.getName(); + this.type = factory.getTypeName(); + this.capabilities = new ColumnCapabilitiesImpl(); + if (type.equalsIgnoreCase("float")) { + capabilities.setType(ValueType.FLOAT); + } else if (type.equalsIgnoreCase("long")) { + capabilities.setType(ValueType.LONG); } else { - throw new ISE("dimension[%s] already existed even though add() was called!?", dimension); + capabilities.setType(ValueType.COMPLEX); } - return holder; } - DimDim get(String dimension) + public int getIndex() { - return dimensions.get(dimension); + return index; + } + + public String getName() + { + return name; + } + + public String getType() + { + return type; + } + + public ColumnCapabilitiesImpl getCapabilities() + { + return capabilities; } } @@ -718,7 +779,7 @@ public abstract class IncrementalIndex implements Iterable, public void sort(); - public boolean compareCannonicalValues(String s1, String s2); + public boolean compareCanonicalValues(String s1, String s2); } /** @@ -788,9 +849,9 @@ public abstract class IncrementalIndex implements Iterable, } @Override - public boolean compareCannonicalValues(String s1, String s2) + public boolean compareCanonicalValues(String s1, String s2) { - return delegate.compareCannonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2)); + return delegate.compareCanonicalValues(Strings.nullToEmpty(s1), Strings.nullToEmpty(s2)); } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 28e903d7aef..c6c1e33a4ae 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -46,6 +46,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -66,17 +67,19 @@ public class IncrementalIndexAdapter implements IndexableAdapter this.invertedIndexes = Maps.newHashMap(); - for (String dimension : index.getDimensions()) { - invertedIndexes.put(dimension, Maps.newHashMap()); + final List dimensions = index.getDimensions(); + + for (IncrementalIndex.DimensionDesc dimension : dimensions) { + invertedIndexes.put(dimension.getName(), Maps.newHashMap()); } int rowNum = 0; for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) { final String[][] dims = timeAndDims.getDims(); - for (String dimension : index.getDimensions()) { - int dimIndex = index.getDimensionIndex(dimension); - Map bitmapIndexes = invertedIndexes.get(dimension); + for (IncrementalIndex.DimensionDesc dimension : dimensions) { + final int dimIndex = dimension.getIndex(); + final Map bitmapIndexes = invertedIndexes.get(dimension.getName()); if (bitmapIndexes == null || dims == null) { log.error("bitmapIndexes and dims are null!"); @@ -122,7 +125,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter @Override public Indexed getDimensionNames() { - return new ListIndexed(index.getDimensions(), String.class); + return new ListIndexed(index.getDimensionNames(), String.class); } @Override @@ -134,7 +137,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter @Override public Indexed getDimValueLookup(String dimension) { - final IncrementalIndex.DimDim dimDim = index.getDimension(dimension); + final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension); dimDim.sort(); return new Indexed() @@ -179,6 +182,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter @Override public Iterator iterator() { + final List dimensions = index.getDimensions(); /* * Note that the transform function increments a counter to determine the rowNum of * the iterated Rowboats. We need to return a new iterator on each @@ -200,9 +204,9 @@ public class IncrementalIndexAdapter implements IndexableAdapter final int rowOffset = input.getValue(); int[][] dims = new int[dimValues.length][]; - for (String dimension : index.getDimensions()) { - int dimIndex = index.getDimensionIndex(dimension); - final IncrementalIndex.DimDim dimDim = index.getDimension(dimension); + for (IncrementalIndex.DimensionDesc dimension : dimensions) { + final int dimIndex = dimension.getIndex(); + final IncrementalIndex.DimDim dimDim = dimension.getValues(); dimDim.sort(); if (dimIndex >= dimValues.length || dimValues[dimIndex] == null) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 9f3b058e2b5..07143ee5139 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -94,7 +94,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public Indexed getAvailableDimensions() { - return new ListIndexed(index.getDimensions(), String.class); + return new ListIndexed(index.getDimensionNames(), String.class); } @Override @@ -109,7 +109,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter if (dimension.equals(Column.TIME_COLUMN_NAME)) { return Integer.MAX_VALUE; } - IncrementalIndex.DimDim dimDim = index.getDimension(dimension); + IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension); if (dimDim == null) { return 0; } @@ -312,7 +312,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn); } - final IncrementalIndex.DimDim dimValLookup = index.getDimension(dimension); + final IncrementalIndex.DimDim dimValLookup = index.getDimensionValues(dimension); if (dimValLookup == null) { return NULL_DIMENSION_SELECTOR; } @@ -611,7 +611,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter if (dimIndexObject == null) { return new BooleanValueMatcher(Strings.isNullOrEmpty(value)); } - final IncrementalIndex.DimDim dimDim = index.getDimension(dimension); + final IncrementalIndex.DimDim dimDim = index.getDimensionValues(dimension); if (!dimDim.contains(value)) { if (Strings.isNullOrEmpty(value)) { final int dimIndex = dimIndexObject; @@ -646,7 +646,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } for (String dimVal : dims[dimIndex]) { - if (dimDim.compareCannonicalValues(id, dimVal)) { + if (dimDim.compareCanonicalValues(id, dimVal)) { return true; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 9e9a05fd086..ad24713e923 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -293,7 +293,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex private DimDim getDimDim(int dimIndex) { - return getDimValues().get(getDimensions().get(dimIndex)); + return getDimensionValues(getDimensionNames().get(dimIndex)); } // MapDB forces serializers to implement serializable, which sucks @@ -470,7 +470,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex } } - public boolean compareCannonicalValues(String s1, String s2) + public boolean compareCanonicalValues(String s1, String s2) { return s1.equals(s2); } diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 87516bedfa7..4f220954fa5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -340,7 +340,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - public boolean compareCannonicalValues(String s1, String s2) + public boolean compareCanonicalValues(String s1, String s2) { return s1 == s2; } diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 804aeaa4a22..25287651564 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -194,7 +194,7 @@ public class IncrementalIndexTest long timestamp = System.currentTimeMillis(); IncrementalIndex index = indexCreator.createIndex(defaultAggregatorFactories); populateIndex(timestamp, index); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensions()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames()); Assert.assertEquals(2, index.size()); final Iterator rows = index.iterator(); @@ -456,7 +456,7 @@ public class IncrementalIndexTest } Assert.assertTrue(latch.await(60, TimeUnit.SECONDS)); - Assert.assertEquals(dimensionCount, index.getDimensions().size()); + Assert.assertEquals(dimensionCount, index.getDimensionNames().size()); Assert.assertEquals(elementsPerThread, index.size()); Iterator iterator = index.iterator(); int curr = 0; @@ -492,6 +492,6 @@ public class IncrementalIndexTest true, 1000000 ); - Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensions()); + Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); } }