From 747343e6212e0c8581e2f0ce7cd6a928fb2241e6 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 19 Jan 2016 13:34:11 -0800 Subject: [PATCH] Preserve dimension order across indexes during ingestion --- .../io/druid/indexer/IndexGeneratorJob.java | 32 +- .../druid/indexer/IndexGeneratorJobTest.java | 135 ++++- .../main/java/io/druid/segment/IndexIO.java | 3 +- .../java/io/druid/segment/IndexMerger.java | 97 +++- .../java/io/druid/segment/IndexMergerV9.java | 43 +- .../segment/incremental/IncrementalIndex.java | 47 +- .../metadata/SegmentMetadataQueryTest.java | 4 +- .../io/druid/segment/IndexMergerTest.java | 358 ++++++++++++- .../io/druid/segment/IndexMergerV9Test.java | 506 +++++++++++++++++- .../QueryableIndexIndexableAdapterTest.java | 2 + .../druid/segment/realtime/plumber/Sink.java | 18 + .../plumber/RealtimePlumberSchoolTest.java | 206 ++++++- 12 files changed, 1321 insertions(+), 130 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 5baf2c05ddc..0a651f73369 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -69,6 +69,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -199,7 +201,8 @@ public class IndexGeneratorJob implements Jobby private static IncrementalIndex makeIncrementalIndex( Bucket theBucket, AggregatorFactory[] aggs, - HadoopDruidIndexerConfig config + HadoopDruidIndexerConfig config, + Iterable oldDimOrder ) { final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); @@ -210,10 +213,16 @@ public class IndexGeneratorJob implements Jobby .withMetrics(aggs) .build(); - return new OnheapIncrementalIndex( + OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex( indexSchema, tuningConfig.getRowFlushBoundary() ); + + if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) { + newIndex.loadDimensionIterable(oldDimOrder); + } + + return newIndex; } public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper @@ -310,9 +319,10 @@ public class IndexGeneratorJob implements Jobby BytesWritable first = iter.next(); if (iter.hasNext()) { + LinkedHashSet dimOrder = Sets.newLinkedHashSet(); SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; - IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config); + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null); index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); while (iter.hasNext()) { @@ -320,9 +330,10 @@ public class IndexGeneratorJob implements Jobby InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); if (!index.canAppendRow()) { + dimOrder.addAll(index.getDimensionOrder()); log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); flushIndexToContextAndClose(key, index, context); - index = makeIncrementalIndex(bucket, combiningAggs, config); + index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder); } index.add(value); @@ -523,7 +534,8 @@ public class IndexGeneratorJob implements Jobby IncrementalIndex index = makeIncrementalIndex( bucket, combiningAggs, - config + config, + null ); try { File baseFlushFile = File.createTempFile("base", "flush"); @@ -536,19 +548,20 @@ public class IndexGeneratorJob implements Jobby int runningTotalLineCount = 0; long startTime = System.currentTimeMillis(); - Set allDimensionNames = Sets.newHashSet(); + Set allDimensionNames = Sets.newLinkedHashSet(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); for (final BytesWritable bw : values) { context.progress(); final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators)); - allDimensionNames.addAll(inputRow.getDimensions()); int numRows = index.add(inputRow); ++lineCount; if (!index.canAppendRow()) { + allDimensionNames.addAll(index.getDimensionOrder()); + log.info(index.getOutOfRowsReason()); log.info( "%,d lines to %,d rows in %,d millis", @@ -569,13 +582,16 @@ public class IndexGeneratorJob implements Jobby index = makeIncrementalIndex( bucket, combiningAggs, - config + config, + allDimensionNames ); startTime = System.currentTimeMillis(); ++indexCount; } } + allDimensionNames.addAll(index.getDimensionOrder()); + log.info("%,d lines completed.", lineCount); List indexes = Lists.newArrayListWithCapacity(indexCount); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index ad30588297a..42dbeddba39 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -30,10 +30,12 @@ import com.metamx.common.Granularity; import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.segment.indexing.DataSchema; @@ -76,9 +78,18 @@ import java.util.Map; @RunWith(Parameterized.class) public class IndexGeneratorJobTest { + final private static AggregatorFactory[] aggs1 = { + new LongSumAggregatorFactory("visited_num", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }; - @Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " + - "inputFormatName={4}, buildV9Directly={5}") + final private static AggregatorFactory[] aggs2 = { + new CountAggregatorFactory("count") + }; + + @Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " + + "data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " + + "aggs={8}, datasourceName={9}, buildV9Directly={10}") public static Collection constructFeed() { final List baseConstructors = Arrays.asList( @@ -133,7 +144,10 @@ public class IndexGeneratorJobTest null, ImmutableList.of("timestamp", "host", "visited_num") ) - ) + ), + null, + aggs1, + "website" }, { false, @@ -175,7 +189,10 @@ public class IndexGeneratorJobTest null, ImmutableList.of("timestamp", "host", "visited_num") ) - ) + ), + null, + aggs1, + "website" }, { true, @@ -217,7 +234,10 @@ public class IndexGeneratorJobTest null, ImmutableList.of("timestamp", "host", "visited_num") ) - ) + ), + null, + aggs1, + "website" }, { false, @@ -269,7 +289,68 @@ public class IndexGeneratorJobTest null, ImmutableList.of("timestamp", "host", "visited_num") ) - ) + ), + null, + aggs1, + "website" + }, + { + // Tests that new indexes inherit the dimension order from previous index + false, + "hashed", + "2014-10-22T00:00:00Z/P1D", + new Integer[][][]{ + { + {0, 1} // use a single partition, dimension order inheritance is not supported across partitions + } + }, + ImmutableList.of( + "{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}", + "{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}", + "{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}", + "{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}", + "{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}", + "{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}" + ), + null, + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "yyyyMMddHH", null), + new DimensionsSpec(null, null, null) + ) + ), + 1, // force 1 row max per index for easier testing + aggs2, + "inherit_dims" + }, + { + // Tests that pre-specified dim order is maintained across indexes. + false, + "hashed", + "2014-10-22T00:00:00Z/P1D", + new Integer[][][]{ + { + {0, 1} + } + }, + ImmutableList.of( + "{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}", + "{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}", + "{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}", + "{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}", + "{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}", + "{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}" + ), + null, + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), null, null) + ) + ), + 1, // force 1 row max per index for easier testing + aggs2, + "inherit_dims2" } } ); @@ -300,6 +381,9 @@ public class IndexGeneratorJobTest private final List data; private final String inputFormatName; private final InputRowParser inputRowParser; + private final Integer maxRowsInMemory; + private final AggregatorFactory[] aggs; + private final String datasourceName; private final boolean buildV9Directly; private ObjectMapper mapper; @@ -315,8 +399,11 @@ public class IndexGeneratorJobTest List data, String inputFormatName, InputRowParser inputRowParser, + Integer maxRowsInMemory, + AggregatorFactory[] aggs, + String datasourceName, boolean buildV9Directly - ) throws IOException + ) throws IOException { this.useCombiner = useCombiner; this.partitionType = partitionType; @@ -325,6 +412,9 @@ public class IndexGeneratorJobTest this.data = data; this.inputFormatName = inputFormatName; this.inputRowParser = inputRowParser; + this.maxRowsInMemory = maxRowsInMemory; + this.aggs = aggs; + this.datasourceName = datasourceName; this.buildV9Directly = buildV9Directly; } @@ -381,15 +471,12 @@ public class IndexGeneratorJobTest config = new HadoopDruidIndexerConfig( new HadoopIngestionSpec( new DataSchema( - "website", + datasourceName, mapper.convertValue( inputRowParser, Map.class ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("visited_num", "visited_num"), - new HyperUniquesAggregatorFactory("unique_hosts", "host") - }, + aggs, new UniformGranularitySpec( Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) ), @@ -406,7 +493,7 @@ public class IndexGeneratorJobTest null, null, null, - null, + maxRowsInMemory, false, false, false, @@ -500,15 +587,29 @@ public class IndexGeneratorJobTest Assert.assertTrue(indexZip.exists()); DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class); - Assert.assertEquals("website", dataSegment.getDataSource()); Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion()); Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval()); Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); - Assert.assertEquals("host", dataSegment.getDimensions().get(0)); - Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); - Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); + + if (datasourceName.equals("website")) { + Assert.assertEquals("website", dataSegment.getDataSource()); + Assert.assertEquals("host", dataSegment.getDimensions().get(0)); + Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + } else if (datasourceName.equals("inherit_dims")) { + Assert.assertEquals("inherit_dims", dataSegment.getDataSource()); + Assert.assertEquals(ImmutableList.of("X", "Y", "M", "Q", "B", "F"), dataSegment.getDimensions()); + Assert.assertEquals("count", dataSegment.getMetrics().get(0)); + } else if (datasourceName.equals("inherit_dims2")) { + Assert.assertEquals("inherit_dims2", dataSegment.getDataSource()); + Assert.assertEquals(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), dataSegment.getDimensions()); + Assert.assertEquals("count", dataSegment.getMetrics().get(0)); + } else { + Assert.fail("Test did not specify supported datasource name"); + } + if (partitionType.equals("hashed")) { Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum]; HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index d1c128bc038..107df523578 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -610,6 +610,7 @@ public class IndexIO continue; } + int emptyStrIdx = dictionary.indexOf(""); List singleValCol = null; VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); GenericIndexed bitmaps = bitmapIndexes.get(dimension); @@ -626,7 +627,7 @@ public class IndexIO if (rowValue.size() > 1) { onlyOneValue = false; } - if (rowValue.size() == 0) { + if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) { if (nullsSet == null) { nullsSet = bitmapFactory.makeEmptyMutableBitmap(); } diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 7659a07f53d..593fa40c082 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -71,6 +71,7 @@ import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.IndexedRTree; +import io.druid.segment.data.ListIndexed; import io.druid.segment.data.TmpFileIOPeon; import io.druid.segment.data.VSizeIndexedWriter; import io.druid.segment.incremental.IncrementalIndex; @@ -104,6 +105,7 @@ public class IndexMerger { private static final Logger log = new Logger(IndexMerger.class); + protected static final ListIndexed EMPTY_STR_DIM_VAL = new ListIndexed<>(Arrays.asList(""), String.class); protected static final SerializerUtils serializerUtils = new SerializerUtils(); protected static final int INVALID_ROW = -1; protected static final Splitter SPLITTER = Splitter.on(","); @@ -269,19 +271,53 @@ public class IndexMerger ); } + private static List getLongestSharedDimOrder(List indexes) + { + int maxSize = 0; + Iterable orderingCandidate = null; + for (IndexableAdapter index : indexes) { + int iterSize = index.getDimensionNames().size(); + if (iterSize > maxSize) { + maxSize = iterSize; + orderingCandidate = index.getDimensionNames(); + } + } + + if (orderingCandidate == null) { + return null; + } + + for (IndexableAdapter index : indexes) { + Iterator candidateIter = orderingCandidate.iterator(); + for (String matchDim : index.getDimensionNames()) { + boolean matched = false; + while (candidateIter.hasNext()) { + String nextDim = candidateIter.next(); + if (matchDim.equals(nextDim)) { + matched = true; + break; + } + } + if (!matched) { + return null; + } + } + } + return ImmutableList.copyOf(orderingCandidate); + } + public static List getMergedDimensions(List indexes) { if (indexes.size() == 0) { return ImmutableList.of(); } - Indexed dimOrder = indexes.get(0).getDimensionNames(); - for (IndexableAdapter index : indexes) { - Indexed dimOrder2 = index.getDimensionNames(); - if (!Iterators.elementsEqual(dimOrder.iterator(), dimOrder2.iterator())) { - return getLexicographicMergedDimensions(indexes); - } + List commonDimOrder = getLongestSharedDimOrder(indexes); + if (commonDimOrder == null) { + log.warn("Indexes have incompatible dimension orders, using lexicographic order."); + return getLexicographicMergedDimensions(indexes); + } else { + return commonDimOrder; } - return ImmutableList.copyOf(dimOrder); } public File merge( @@ -602,13 +638,35 @@ public class IndexMerger ); writer.open(); - List> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size()); + List> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1); DimValueConverter[] converters = new DimValueConverter[indexes.size()]; + boolean dimHasValues = false; + boolean[] dimHasValuesByIndex = new boolean[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) { Indexed dimValues = indexes.get(i).getDimValueLookup(dimension); if (!isNullColumn(dimValues)) { + dimHasValues = true; + dimHasValuesByIndex[i] = true; dimValueLookups.add(dimValues); converters[i] = new DimValueConverter(dimValues); + } else { + dimHasValuesByIndex[i] = false; + } + } + + /* + * Ensure the empty str is always in the dictionary if column is not null across indexes + * This is done so that MMappedIndexRowIterable can convert null columns to empty strings + * later on, to allow rows from indexes with no values at all for a dimension to merge correctly with + * rows from indexes with partial null values for that dimension. + */ + if (dimHasValues) { + dimValueLookups.add(EMPTY_STR_DIM_VAL); + for (int i = 0; i < indexes.size(); i++) { + if (!dimHasValuesByIndex[i]) { + converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL); + } } } @@ -652,6 +710,7 @@ public class IndexMerger ++count; } + dimensionCardinalities.put(dimension, count); FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); @@ -725,8 +784,9 @@ public class IndexMerger ); } } - ) - , mergedDimensions, dimConversions.get(i), i + ), + mergedDimensions, dimConversions.get(i), i, + dimensionCardinalities ) ); } @@ -944,7 +1004,6 @@ public class IndexMerger ByteStreams.copy(spatialWriter.combineStreams(), spatialOut); spatialIoPeon.cleanup(); } - } log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); @@ -1161,18 +1220,22 @@ public class IndexMerger private final List convertedDims; private final Map converters; private final int indexNumber; + private final Map dimCardinalities; + private static final int[] EMPTY_STR_DIM = new int[]{0}; MMappedIndexRowIterable( Iterable index, List convertedDims, Map converters, - int indexNumber + int indexNumber, + Map dimCardinalities ) { this.index = index; this.convertedDims = convertedDims; this.converters = converters; this.indexNumber = indexNumber; + this.dimCardinalities = dimCardinalities; } public Iterable getIndex() @@ -1206,12 +1269,20 @@ public class IndexMerger int[][] newDims = new int[convertedDims.size()][]; for (int i = 0; i < convertedDims.size(); ++i) { IntBuffer converter = converterArray[i]; + String dimName = convertedDims.get(i); if (converter == null) { continue; } - if (i >= dims.length || dims[i] == null) { + if (i >= dims.length) { + continue; + } + + if (dims[i] == null) { + if (dimCardinalities.get(dimName) > 0) { + newDims[i] = EMPTY_STR_DIM; + } continue; } diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index a5ab3ccc249..db111ca2583 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -183,7 +183,7 @@ public class IndexMergerV9 extends IndexMerger /************* Walk through data sets, merge them, and write merged columns *************/ progress.progress(); final Iterable theRows = makeRowIterable( - adapters, mergedDimensions, mergedMetrics, dimConversions, rowMergerFn + adapters, mergedDimensions, mergedMetrics, dimConversions, dimCardinalities, rowMergerFn ); final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon); final ArrayList dimWriters = setupDimensionWriters( @@ -267,8 +267,8 @@ public class IndexMergerV9 extends IndexMerger progress.startSection(section); long startTime = System.currentTimeMillis(); - final Set finalColumns = Sets.newTreeSet(); - final Set finalDimensions = Sets.newTreeSet(); + final Set finalDimensions = Sets.newLinkedHashSet(); + final Set finalColumns = Sets.newLinkedHashSet(); finalColumns.addAll(mergedMetrics); for (int i = 0; i < mergedDimensions.size(); ++i) { if (dimensionSkipFlag.get(i)) { @@ -665,7 +665,7 @@ public class IndexMergerV9 extends IndexMerger if (dimensionSkipFlag.get(i)) { continue; } - if (dims[i] == null || dims[i].length == 0) { + if (dims[i] == null || dims[i].length == 0 || (dims[i].length == 1 && dims[i][0] == 0)) { nullRowsList.get(i).add(rowCount); } dimWriters.get(i).add(dims[i]); @@ -778,6 +778,7 @@ public class IndexMergerV9 extends IndexMerger final List mergedDimensions, final List mergedMetrics, final ArrayList> dimConversions, + final Map dimCardinalities, final Function>, Iterable> rowMergerFn ) { @@ -834,7 +835,8 @@ public class IndexMergerV9 extends IndexMerger ), mergedDimensions, dimConversions.get(i), - i + i, + dimCardinalities ) ); } @@ -886,32 +888,39 @@ public class IndexMergerV9 extends IndexMerger // each converter converts dim values of this dimension to global dictionary DimValueConverter[] converters = new DimValueConverter[adapters.size()]; - boolean existNullColumn = false; + boolean dimHasValues = false; + boolean[] dimHasValuesByIndex = new boolean[adapters.size()]; for (int i = 0; i < adapters.size(); i++) { Indexed dimValues = adapters.get(i).getDimValueLookup(dimension); if (!isNullColumn(dimValues)) { + dimHasValues = true; + dimHasValuesByIndex[i] = true; dimValueLookups.add(dimValues); converters[i] = new DimValueConverter(dimValues); } else { - existNullColumn = true; + dimHasValuesByIndex[i] = false; } } - Iterable> bumpedDimValueLookups; - if (!dimValueLookups.isEmpty() && existNullColumn) { - log.info("dim[%s] are null in some indexes, append null value to dim values", dimension); - bumpedDimValueLookups = Iterables.concat( - Arrays.asList(new ArrayIndexed<>(new String[]{null}, String.class)), - dimValueLookups - ); - } else { - bumpedDimValueLookups = dimValueLookups; + /* + * Ensure the empty str is always in the dictionary if column is not null across indexes + * This is done so that MMappedIndexRowIterable can convert null columns to empty strings + * later on, to allow rows from indexes with no values at all for a dimension to merge correctly with + * rows from indexes with partial null values for that dimension. + */ + if (dimHasValues) { + dimValueLookups.add(EMPTY_STR_DIM_VAL); + for (int i = 0; i < adapters.size(); i++) { + if (!dimHasValuesByIndex[i]) { + converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL); + } + } } // sort all dimension values and treat all null values as empty strings Iterable dimensionValues = CombiningIterable.createSplatted( Iterables.transform( - bumpedDimValueLookups, + dimValueLookups, new Function, Iterable>() { @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 1c0e1d0c1fe..fadb9414ee3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -401,10 +401,10 @@ public abstract class IncrementalIndex implements Iterable, /** * Adds a new row. The row might correspond with another row that already exists, in which case this will * update that row instead of inserting a new one. - *

- *

+ *

+ *

* Calls to add() are thread safe. - *

+ *

* * @param row the row of data to add * @@ -599,6 +599,36 @@ public abstract class IncrementalIndex implements Iterable, return dimSpec == null ? null : dimSpec.getIndex(); } + public List getDimensionOrder() + { + synchronized (dimensionDescs) { + return ImmutableList.copyOf(dimensionDescs.keySet()); + } + } + + /* + * Currently called to initialize IncrementalIndex dimension order during index creation + * Index dimension ordering could be changed to initalize from DimensionsSpec after resolution of + * https://github.com/druid-io/druid/issues/2011 + */ + public void loadDimensionIterable(Iterable oldDimensionOrder) + { + synchronized (dimensionDescs) { + if (!dimensionDescs.isEmpty()) { + throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionDescs.keySet()); + } + for (String dim : oldDimensionOrder) { + if (dimensionDescs.get(dim) == null) { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.setType(ValueType.STRING); + columnCapabilities.put(dim, capabilities); + DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities); + dimensionDescs.put(dim, desc); + } + } + } + } + public List getMetricNames() { return ImmutableList.copyOf(metricDescs.keySet()); @@ -903,13 +933,10 @@ public abstract class IncrementalIndex implements Iterable, public int compareTo(TimeAndDims rhs) { int retVal = Longs.compare(timestamp, rhs.timestamp); - - if (retVal == 0) { - retVal = Ints.compare(dims.length, rhs.dims.length); - } + int numComparisons = Math.min(dims.length, rhs.dims.length); int index = 0; - while (retVal == 0 && index < dims.length) { + while (retVal == 0 && index < numComparisons) { String[] lhsVals = dims[index]; String[] rhsVals = rhs.dims[index]; @@ -935,6 +962,10 @@ public abstract class IncrementalIndex implements Iterable, ++index; } + if (retVal == 0) { + return Ints.compare(dims.length, rhs.dims.length); + } + return retVal; } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index 5a2cfd5f72a..bd4ddc78719 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -104,7 +104,7 @@ public class SegmentMetadataQueryTest new ColumnAnalysis( ValueType.STRING.toString(), 10881, - 1, + 2, null ) ), 71982, @@ -135,7 +135,7 @@ public class SegmentMetadataQueryTest new ColumnAnalysis( ValueType.STRING.toString(), 21762, - 1, + 2, null ) ), diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 514980cde0b..c36616867ae 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.DimensionsSpec; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -41,7 +42,9 @@ import io.druid.segment.data.IndexedInts; import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OnheapIncrementalIndex; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -59,9 +62,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +@Deprecated @RunWith(Parameterized.class) public class IndexMergerTest { + // Deprecated, use IndexMergerV9Test instead @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -202,8 +207,8 @@ public class IndexMergerTest } Assert.assertEquals(2, boatList.size()); - Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); - Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims()); checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("dim1", "")); checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1")); @@ -445,7 +450,7 @@ public class IndexMergerTest final QueryableIndex index2 = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.persist( - toPersist1, + toPersist2, tmpDir2, indexSpec ) @@ -468,7 +473,7 @@ public class IndexMergerTest Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); - Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); @@ -493,8 +498,8 @@ public class IndexMergerTest ); QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MERGER.append( + INDEX_IO.loadIndex( + INDEX_MERGER.append( ImmutableList.of(incrementalAdapter), null, tempDir1, indexSpec ) ) @@ -830,22 +835,111 @@ public class IndexMergerTest Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(3, boatList.size()); - Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims()); - Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); - Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims()); - Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); - Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics()); } + @Test + public void testMergeWithDimensionsList() throws Exception + { + IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withDimensionsSpec(new DimensionsSpec(Arrays.asList("dimA", "dimB", "dimC"), null, null)) + .withMinTimestamp(0L) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .build(); + + + IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, 1000); + IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, 1000); + IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, 1000); + + addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2")); + addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2")); + addDimValuesToIndex(toPersist3, "dimC", Arrays.asList("1", "2")); + + + final File tmpDir = temporaryFolder.newFolder(); + final File tmpDir2 = temporaryFolder.newFolder(); + final File tmpDir3 = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex index1 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist1, + tmpDir, + indexSpec + ) + ) + ); + + QueryableIndex index2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist2, + tmpDir2, + indexSpec + ) + ) + ); + + QueryableIndex index3 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist3, + tmpDir3, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(index1, index2, index3), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = new ArrayList<>(); + for (Rowboat boat : boats) { + boatList.add(boat); + } + + Assert.assertEquals(ImmutableList.of("dimA", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(4, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics()); + } + + @Test public void testDisjointDimMerge() throws Exception { IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + IncrementalIndex toPersistB2 = getIndexWithDims(Arrays.asList("dimA", "dimB")); + addDimValuesToIndex(toPersistB2, "dimB", Arrays.asList("1", "2", "3")); final File tmpDirA = temporaryFolder.newFolder(); final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirB2 = temporaryFolder.newFolder(); final File tmpDirMerged = temporaryFolder.newFolder(); QueryableIndex indexA = closer.closeLater( @@ -868,6 +962,16 @@ public class IndexMergerTest ) ); + QueryableIndex indexB2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB2, + tmpDirB2, + indexSpec + ) + ) + ); + final QueryableIndex merged = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( @@ -879,12 +983,22 @@ public class IndexMergerTest ) ); + final QueryableIndex merged2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - Iterable boats = adapter.getRows(); - List boatList = new ArrayList<>(); - for (Rowboat boat : boats) { - boatList.add(boat); - } + List boatList = ImmutableList.copyOf(adapter.getRows()); + + final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); + List boatList2 = ImmutableList.copyOf(adapter2.getRows()); Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(5, boatList.size()); @@ -899,6 +1013,19 @@ public class IndexMergerTest Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(4).getDims()); Assert.assertArrayEquals(new Object[]{1L}, boatList.get(4).getMetrics()); + Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter2.getDimensionNames())); + Assert.assertEquals(5, boatList2.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList2.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList2.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList2.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList2.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList2.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics()); + checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("dimA", "")); checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimA", "1")); checkBitmapIndex(Lists.newArrayList(4), adapter.getBitmapIndex("dimA", "2")); @@ -1017,10 +1144,10 @@ public class IndexMergerTest ImmutableList.copyOf(adapter.getDimensionNames()) ); Assert.assertEquals(4, boatList.size()); - Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims()); - Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims()); - Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims()); - Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims()); checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", "")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210")); @@ -1063,6 +1190,174 @@ public class IndexMergerTest } } + @Test + public void testMergeWithSupersetOrdering() throws Exception + { + IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); + IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + + IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2")); + + IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 1000 + ); + + toPersistBA2.add( + new MapBasedInputRow( + 1, + Arrays.asList("dimB", "dimA"), + ImmutableMap.of("dimB", "1", "dimA", "") + ) + ); + + toPersistBA2.add( + new MapBasedInputRow( + 1, + Arrays.asList("dimB", "dimA"), + ImmutableMap.of("dimB", "", "dimA", "1") + ) + ); + + + IncrementalIndex toPersistC = getSingleDimIndex("dimA", Arrays.asList("1", "2")); + addDimValuesToIndex(toPersistC, "dimC", Arrays.asList("1", "2", "3")); + + final File tmpDirA = temporaryFolder.newFolder(); + final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirBA = temporaryFolder.newFolder(); + final File tmpDirBA2 = temporaryFolder.newFolder(); + final File tmpDirC = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + final File tmpDirMerged2 = temporaryFolder.newFolder(); + + QueryableIndex indexA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistA, + tmpDirA, + indexSpec + ) + ) + ); + + QueryableIndex indexB = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB, + tmpDirB, + indexSpec + ) + ) + ); + + QueryableIndex indexBA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistBA, + tmpDirBA, + indexSpec + ) + ) + ); + + QueryableIndex indexBA2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistBA2, + tmpDirBA2, + indexSpec + ) + ) + ); + + QueryableIndex indexC = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistC, + tmpDirC, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexBA, indexBA2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final QueryableIndex merged2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexBA, indexC), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged2, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = ImmutableList.copyOf(boats); + + final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); + Iterable boats2 = adapter2.getRows(); + List boatList2 = ImmutableList.copyOf(boats2); + + Assert.assertEquals(ImmutableList.of("dimB", "dimA"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(5, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{3}, {0}}, boatList.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(4).getMetrics()); + + Assert.assertEquals(ImmutableList.of("dimA", "dimB", "dimC"), ImmutableList.copyOf(adapter2.getDimensionNames())); + Assert.assertEquals(12, boatList2.size()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}}, boatList2.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList2.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {3}}, boatList2.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics()); + + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(5).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(5).getMetrics()); + + Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}}, boatList2.get(6).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList2.get(6).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(7).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(7).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(8).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(8).getMetrics()); + + Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(9).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(9).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(10).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(10).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(11).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList2.get(11).getMetrics()); + } + private IncrementalIndex getIndexD3() throws Exception { IncrementalIndex toPersist1 = new OnheapIncrementalIndex( @@ -1108,8 +1403,14 @@ public class IndexMergerTest 1000 ); + addDimValuesToIndex(toPersist1, dimName, values); + return toPersist1; + } + + private void addDimValuesToIndex(IncrementalIndex index, String dimName, List values) throws Exception + { for (String val : values) { - toPersist1.add( + index.add( new MapBasedInputRow( 1, Arrays.asList(dimName), @@ -1117,8 +1418,21 @@ public class IndexMergerTest ) ); } + } - return toPersist1; + private IncrementalIndex getIndexWithDims(List dims) + { + IncrementalIndexSchema schema = new IncrementalIndexSchema( + 0L, + QueryGranularity.NONE, + new DimensionsSpec(dims, null, null), + new AggregatorFactory[]{new CountAggregatorFactory("count")} + ); + + return new OnheapIncrementalIndex( + schema, + 1000 + ); } private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java b/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java index 8930c28e242..b4b6275f930 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.DimensionsSpec; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -41,6 +42,7 @@ import io.druid.segment.data.IndexedInts; import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.junit.Assert; import org.junit.Rule; @@ -64,16 +66,22 @@ public class IndexMergerV9Test @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - protected final static IndexMergerV9 INDEX_MERGER = TestHelper.getTestIndexMergerV9(); + protected IndexMerger INDEX_MERGER; private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO(); - @Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}") + @Parameterized.Parameters(name = "{index}: useV9={0}, bitmap={1}, metric compression={2}, dimension compression={3}") public static Collection data() { return Collections2.transform( Sets.cartesianProduct( ImmutableList.of( - ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()), + ImmutableSet.of( + true, + false + ), + ImmutableSet.of( + new RoaringBitmapSerdeFactory(), + new ConciseBitmapSerdeFactory()), ImmutableSet.of( CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED, CompressedObjectStrategy.CompressionStrategy.LZ4, @@ -119,12 +127,18 @@ public class IndexMergerV9Test public final CloserRule closer = new CloserRule(false); public IndexMergerV9Test( + boolean useV9, BitmapSerdeFactory bitmapSerdeFactory, CompressedObjectStrategy.CompressionStrategy compressionStrategy, CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy ) { this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy); + if (useV9) { + INDEX_MERGER = TestHelper.getTestIndexMergerV9(); + } else { + INDEX_MERGER = TestHelper.getTestIndexMerger(); + } } @Test @@ -196,8 +210,8 @@ public class IndexMergerV9Test } Assert.assertEquals(2, boatList.size()); - Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); - Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims()); checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("dim1", "")); checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1")); @@ -364,7 +378,7 @@ public class IndexMergerV9Test final QueryableIndex index2 = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.persist( - toPersist1, + toPersist2, tmpDir2, indexSpec ) @@ -387,7 +401,7 @@ public class IndexMergerV9Test Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); - Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); @@ -452,7 +466,6 @@ public class IndexMergerV9Test assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); } - @Test public void testAppendRetainsValues() throws Exception { @@ -716,6 +729,7 @@ public class IndexMergerV9Test compressedSupplierField.setAccessible(true); Object supplier = compressedSupplierField.get(obj); + Field compressionField = supplier.getClass().getDeclaredField("compression"); compressionField.setAccessible(true); @@ -724,14 +738,201 @@ public class IndexMergerV9Test Assert.assertEquals(expectedStrategy, strategy); } + + @Test + public void testNonLexicographicDimOrderMerge() throws Exception + { + IncrementalIndex toPersist1 = getIndexD3(); + IncrementalIndex toPersist2 = getIndexD3(); + IncrementalIndex toPersist3 = getIndexD3(); + + final File tmpDir = temporaryFolder.newFolder(); + final File tmpDir2 = temporaryFolder.newFolder(); + final File tmpDir3 = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex index1 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist1, + tmpDir, + indexSpec + ) + ) + ); + + QueryableIndex index2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist2, + tmpDir2, + indexSpec + ) + ) + ); + + QueryableIndex index3 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist3, + tmpDir3, + indexSpec + ) + ) + ); + + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(index1, index2, index3), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = new ArrayList<>(); + for (Rowboat boat : boats) { + boatList.add(boat); + } + + Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(3, boatList.size()); + Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics()); + + checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("d3", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d3", "30000")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d3", "40000")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d3", "50000")); + + checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("d1", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d1", "100")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d1", "200")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d1", "300")); + + checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("d2", "")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "2000")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d2", "3000")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d2", "4000")); + + } + + @Test + public void testMergeWithDimensionsList() throws Exception + { + IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withDimensionsSpec(new DimensionsSpec(Arrays.asList("dimA", "dimB", "dimC"), null, null)) + .withMinTimestamp(0L) + .withQueryGranularity(QueryGranularity.NONE) + .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .build(); + + + IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, 1000); + IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, 1000); + IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, 1000); + + addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2")); + addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2")); + addDimValuesToIndex(toPersist3, "dimC", Arrays.asList("1", "2")); + + + final File tmpDir = temporaryFolder.newFolder(); + final File tmpDir2 = temporaryFolder.newFolder(); + final File tmpDir3 = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + + QueryableIndex index1 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist1, + tmpDir, + indexSpec + ) + ) + ); + + QueryableIndex index2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist2, + tmpDir2, + indexSpec + ) + ) + ); + + QueryableIndex index3 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersist3, + tmpDir3, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(index1, index2, index3), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = new ArrayList<>(); + for (Rowboat boat : boats) { + boatList.add(boat); + } + + Assert.assertEquals(ImmutableList.of("dimA", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(4, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics()); + + checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dimA", "")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimA", "1")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimA", "2")); + + checkBitmapIndex(new ArrayList(), adapter.getBitmapIndex("dimB", "")); + + checkBitmapIndex(Lists.newArrayList(2, 3), adapter.getBitmapIndex("dimC", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimC", "1")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimC", "2")); + } + + @Test public void testDisjointDimMerge() throws Exception { IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + IncrementalIndex toPersistB2 = getIndexWithDims(Arrays.asList("dimA", "dimB")); + addDimValuesToIndex(toPersistB2, "dimB", Arrays.asList("1", "2", "3")); final File tmpDirA = temporaryFolder.newFolder(); final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirB2 = temporaryFolder.newFolder(); final File tmpDirMerged = temporaryFolder.newFolder(); QueryableIndex indexA = closer.closeLater( @@ -754,6 +955,16 @@ public class IndexMergerV9Test ) ); + QueryableIndex indexB2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB2, + tmpDirB2, + indexSpec + ) + ) + ); + final QueryableIndex merged = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( @@ -765,12 +976,22 @@ public class IndexMergerV9Test ) ); + final QueryableIndex merged2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); - Iterable boats = adapter.getRows(); - List boatList = new ArrayList<>(); - for (Rowboat boat : boats) { - boatList.add(boat); - } + List boatList = ImmutableList.copyOf(adapter.getRows()); + + final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); + List boatList2 = ImmutableList.copyOf(adapter2.getRows()); Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(5, boatList.size()); @@ -793,6 +1014,28 @@ public class IndexMergerV9Test checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimB", "1")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimB", "2")); checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimB", "3")); + + Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter2.getDimensionNames())); + Assert.assertEquals(5, boatList2.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList2.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList2.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList2.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList2.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList2.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics()); + + checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter2.getBitmapIndex("dimA", "")); + checkBitmapIndex(Lists.newArrayList(3), adapter2.getBitmapIndex("dimA", "1")); + checkBitmapIndex(Lists.newArrayList(4), adapter2.getBitmapIndex("dimA", "2")); + + checkBitmapIndex(Lists.newArrayList(3, 4), adapter2.getBitmapIndex("dimB", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter2.getBitmapIndex("dimB", "1")); + checkBitmapIndex(Lists.newArrayList(1), adapter2.getBitmapIndex("dimB", "2")); + checkBitmapIndex(Lists.newArrayList(2), adapter2.getBitmapIndex("dimB", "3")); } @Test @@ -903,10 +1146,10 @@ public class IndexMergerV9Test ImmutableList.copyOf(adapter.getDimensionNames()) ); Assert.assertEquals(4, boatList.size()); - Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims()); - Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims()); - Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims()); - Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims()); checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", "")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210")); @@ -949,6 +1192,199 @@ public class IndexMergerV9Test } } + @Test + public void testMergeWithSupersetOrdering() throws Exception + { + IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); + IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + + IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); + addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2")); + + IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex( + 0L, + QueryGranularity.NONE, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + 1000 + ); + + toPersistBA2.add( + new MapBasedInputRow( + 1, + Arrays.asList("dimB", "dimA"), + ImmutableMap.of("dimB", "1", "dimA", "") + ) + ); + + toPersistBA2.add( + new MapBasedInputRow( + 1, + Arrays.asList("dimB", "dimA"), + ImmutableMap.of("dimB", "", "dimA", "1") + ) + ); + + + IncrementalIndex toPersistC = getSingleDimIndex("dimA", Arrays.asList("1", "2")); + addDimValuesToIndex(toPersistC, "dimC", Arrays.asList("1", "2", "3")); + + final File tmpDirA = temporaryFolder.newFolder(); + final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirBA = temporaryFolder.newFolder(); + final File tmpDirBA2 = temporaryFolder.newFolder(); + final File tmpDirC = temporaryFolder.newFolder(); + final File tmpDirMerged = temporaryFolder.newFolder(); + final File tmpDirMerged2 = temporaryFolder.newFolder(); + + QueryableIndex indexA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistA, + tmpDirA, + indexSpec + ) + ) + ); + + QueryableIndex indexB = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistB, + tmpDirB, + indexSpec + ) + ) + ); + + QueryableIndex indexBA = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistBA, + tmpDirBA, + indexSpec + ) + ) + ); + + QueryableIndex indexBA2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistBA2, + tmpDirBA2, + indexSpec + ) + ) + ); + + QueryableIndex indexC = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.persist( + toPersistC, + tmpDirC, + indexSpec + ) + ) + ); + + final QueryableIndex merged = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexBA, indexBA2), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged, + indexSpec + ) + ) + ); + + final QueryableIndex merged2 = closer.closeLater( + INDEX_IO.loadIndex( + INDEX_MERGER.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexBA, indexC), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged2, + indexSpec + ) + ) + ); + + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + Iterable boats = adapter.getRows(); + List boatList = ImmutableList.copyOf(boats); + + final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2); + Iterable boats2 = adapter2.getRows(); + List boatList2 = ImmutableList.copyOf(boats2); + + Assert.assertEquals(ImmutableList.of("dimB", "dimA"), ImmutableList.copyOf(adapter.getDimensionNames())); + Assert.assertEquals(5, boatList.size()); + Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{3}, {0}}, boatList.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList.get(4).getMetrics()); + + checkBitmapIndex(Lists.newArrayList(2, 3, 4), adapter.getBitmapIndex("dimA", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimA", "1")); + checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimA", "2")); + + checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dimB", "")); + checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimB", "1")); + checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimB", "2")); + checkBitmapIndex(Lists.newArrayList(4), adapter.getBitmapIndex("dimB", "3")); + + + Assert.assertEquals(ImmutableList.of("dimA", "dimB", "dimC"), ImmutableList.copyOf(adapter2.getDimensionNames())); + Assert.assertEquals(12, boatList2.size()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}}, boatList2.get(0).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList2.get(1).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {0}, {3}}, boatList2.get(2).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics()); + + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(3).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(4).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(5).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(5).getMetrics()); + + Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}}, boatList2.get(6).getDims()); + Assert.assertArrayEquals(new Object[]{3L}, boatList2.get(6).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(7).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(7).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(8).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(8).getMetrics()); + + Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(9).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(9).getMetrics()); + Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(10).getDims()); + Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(10).getMetrics()); + Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(11).getDims()); + Assert.assertArrayEquals(new Object[]{2L}, boatList2.get(11).getMetrics()); + + checkBitmapIndex(Lists.newArrayList(0, 1, 2, 3, 4, 5, 8, 9, 10), adapter2.getBitmapIndex("dimA", "")); + checkBitmapIndex(Lists.newArrayList(6), adapter2.getBitmapIndex("dimA", "1")); + checkBitmapIndex(Lists.newArrayList(7, 11), adapter2.getBitmapIndex("dimA", "2")); + + checkBitmapIndex(Lists.newArrayList(0, 1, 2, 6, 7, 11), adapter2.getBitmapIndex("dimB", "")); + checkBitmapIndex(Lists.newArrayList(3, 8), adapter2.getBitmapIndex("dimB", "1")); + checkBitmapIndex(Lists.newArrayList(4, 9), adapter2.getBitmapIndex("dimB", "2")); + checkBitmapIndex(Lists.newArrayList(5, 10), adapter2.getBitmapIndex("dimB", "3")); + + checkBitmapIndex(Lists.newArrayList(3, 4, 5, 6, 7, 8, 9, 10, 11), adapter2.getBitmapIndex("dimC", "")); + checkBitmapIndex(Lists.newArrayList(0), adapter2.getBitmapIndex("dimC", "1")); + checkBitmapIndex(Lists.newArrayList(1), adapter2.getBitmapIndex("dimC", "2")); + checkBitmapIndex(Lists.newArrayList(2), adapter2.getBitmapIndex("dimC", "3")); + + } + private IncrementalIndex getIndexD3() throws Exception { IncrementalIndex toPersist1 = new OnheapIncrementalIndex( @@ -960,7 +1396,7 @@ public class IndexMergerV9Test toPersist1.add( new MapBasedInputRow( - 0, + 1, Arrays.asList("d3", "d1", "d2"), ImmutableMap.of("d1", "100", "d2", "4000", "d3", "30000") ) @@ -968,17 +1404,17 @@ public class IndexMergerV9Test toPersist1.add( new MapBasedInputRow( - 0, + 1, Arrays.asList("d3", "d1", "d2"), - ImmutableMap.of("d1", "200", "d2", "3000", "d3", "50000") + ImmutableMap.of("d1", "300", "d2", "2000", "d3", "40000") ) ); toPersist1.add( new MapBasedInputRow( - 0, + 1, Arrays.asList("d3", "d1", "d2"), - ImmutableMap.of("d1", "300", "d2", "2000", "d3", "40000") + ImmutableMap.of("d1", "200", "d2", "3000", "d3", "50000") ) ); @@ -994,8 +1430,14 @@ public class IndexMergerV9Test 1000 ); + addDimValuesToIndex(toPersist1, dimName, values); + return toPersist1; + } + + private void addDimValuesToIndex(IncrementalIndex index, String dimName, List values) throws Exception + { for (String val : values) { - toPersist1.add( + index.add( new MapBasedInputRow( 1, Arrays.asList(dimName), @@ -1003,7 +1445,21 @@ public class IndexMergerV9Test ) ); } - - return toPersist1; } + + private IncrementalIndex getIndexWithDims(List dims) + { + IncrementalIndexSchema schema = new IncrementalIndexSchema( + 0L, + QueryGranularity.NONE, + new DimensionsSpec(dims, null, null), + new AggregatorFactory[]{new CountAggregatorFactory("count")} + ); + + return new OnheapIncrementalIndex( + schema, + 1000 + ); + } + } diff --git a/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java b/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java index dbaa6f5eb31..5e298a568ee 100644 --- a/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java @@ -74,6 +74,8 @@ public class QueryableIndexIndexableAdapterTest { IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1"); + IndexedInts indexedIntsNull = bitmapIndexSeeker.seek(null); + Assert.assertEquals(0, indexedIntsNull.size()); IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0"); Assert.assertEquals(0, indexedInts0.size()); IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1"); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index ca3951e2c93..b84217bcb75 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -24,10 +24,13 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.data.input.InputRow; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.QueryableIndex; +import io.druid.segment.data.Indexed; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; @@ -41,6 +44,8 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Arrays; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -55,6 +60,7 @@ public class Sink implements Iterable private final RealtimeTuningConfig config; private final String version; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); + private final LinkedHashSet dimOrder = Sets.newLinkedHashSet(); private volatile FireHydrant currHydrant; public Sink( @@ -204,6 +210,18 @@ public class Sink implements Iterable if (numHydrants > 0) { FireHydrant lastHydrant = hydrants.get(numHydrants - 1); newCount = lastHydrant.getCount() + 1; + if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) { + if (lastHydrant.hasSwapped()) { + QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex(); + for (String dim : oldIndex.getAvailableDimensions()) { + dimOrder.add(dim); + } + } else { + IncrementalIndex oldIndex = lastHydrant.getIndex(); + dimOrder.addAll(oldIndex.getDimensionOrder()); + } + newIndex.loadDimensionIterable(dimOrder); + } } currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); hydrants.add(currHydrant); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index e59eaf85383..ff14c0dddf5 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -21,6 +21,7 @@ package io.druid.segment.realtime.plumber; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; @@ -43,6 +44,7 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.SegmentDescriptor; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -71,9 +73,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** */ @@ -245,7 +247,6 @@ public class RealtimePlumberSchoolTest private void testPersist(final Object commitMetadata) throws Exception { - final AtomicBoolean committed = new AtomicBoolean(false); plumber.getSinks() .put( 0L, @@ -262,6 +263,9 @@ public class RealtimePlumberSchoolTest EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); + + final CountDownLatch doneSignal = new CountDownLatch(1); + final Committer committer = new Committer() { @Override @@ -273,15 +277,14 @@ public class RealtimePlumberSchoolTest @Override public void run() { - committed.set(true); + doneSignal.countDown(); } }; plumber.add(row, Suppliers.ofInstance(committer)); plumber.persist(committer); - while (!committed.get()) { - Thread.sleep(100); - } + doneSignal.await(); + plumber.getSinks().clear(); plumber.finishJob(); } @@ -289,7 +292,6 @@ public class RealtimePlumberSchoolTest @Test(timeout = 60000) public void testPersistFails() throws Exception { - final AtomicBoolean committed = new AtomicBoolean(false); plumber.getSinks() .put( 0L, @@ -306,6 +308,9 @@ public class RealtimePlumberSchoolTest EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); plumber.add(row, Committers.supplierOf(Committers.nil())); + + final CountDownLatch doneSignal = new CountDownLatch(1); + plumber.persist( Committers.supplierFromRunnable( new Runnable() @@ -313,15 +318,14 @@ public class RealtimePlumberSchoolTest @Override public void run() { - committed.set(true); + doneSignal.countDown(); throw new RuntimeException(); } } ).get() ); - while (!committed.get()) { - Thread.sleep(100); - } + + doneSignal.await(); // Exception may need time to propagate while (metrics.failedPersists() < 1) { @@ -340,7 +344,6 @@ public class RealtimePlumberSchoolTest private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception { - final AtomicBoolean committed = new AtomicBoolean(false); Interval testInterval = new Interval(new DateTime("1970-01-01"), new DateTime("1971-01-01")); RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); @@ -355,7 +358,7 @@ public class RealtimePlumberSchoolTest ) ); Assert.assertNull(plumber2.startJob()); - + final CountDownLatch doneSignal = new CountDownLatch(1); final Committer committer = new Committer() { @Override @@ -367,7 +370,7 @@ public class RealtimePlumberSchoolTest @Override public void run() { - committed.set(true); + doneSignal.countDown(); } }; plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer)); @@ -378,9 +381,7 @@ public class RealtimePlumberSchoolTest plumber2.persist(committer); - while (!committed.get()) { - Thread.sleep(100); - } + doneSignal.await(); plumber2.getSinks().clear(); plumber2.finishJob(); @@ -438,6 +439,123 @@ public class RealtimePlumberSchoolTest Assert.assertEquals(0, restoredPlumber2.getSinks().size()); } + @Test(timeout = 60000) + public void testDimOrderInheritance() throws Exception + { + final Object commitMetadata = "dummyCommitMetadata"; + testDimOrderInheritanceHelper(commitMetadata); + } + + private void testDimOrderInheritanceHelper(final Object commitMetadata) throws Exception + { + List> expectedDims = ImmutableList.>of( + ImmutableList.of("dimD"), + ImmutableList.of("dimC"), + ImmutableList.of("dimA"), + ImmutableList.of("dimB"), + ImmutableList.of("dimE"), + ImmutableList.of("dimD", "dimC", "dimA", "dimB", "dimE") + ); + + QueryableIndex qindex; + FireHydrant hydrant; + Map sinks; + + RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + Assert.assertNull(plumber.startJob()); + + final CountDownLatch doneSignal = new CountDownLatch(1); + + final Committer committer = new Committer() + { + @Override + public Object getMetadata() + { + return commitMetadata; + } + + @Override + public void run() + { + doneSignal.countDown(); + } + }; + + plumber.add( + getTestInputRowFull( + "1970-01-01", + ImmutableList.of("dimD"), + ImmutableList.of("1") + ), + Suppliers.ofInstance(committer) + ); + plumber.add( + getTestInputRowFull( + "1970-01-01", + ImmutableList.of("dimC"), + ImmutableList.of("1") + ), + Suppliers.ofInstance(committer) + ); + plumber.add( + getTestInputRowFull( + "1970-01-01", + ImmutableList.of("dimA"), + ImmutableList.of("1") + ), + Suppliers.ofInstance(committer) + ); + plumber.add( + getTestInputRowFull( + "1970-01-01", + ImmutableList.of("dimB"), + ImmutableList.of("1") + ), + Suppliers.ofInstance(committer) + ); + plumber.add( + getTestInputRowFull( + "1970-01-01", + ImmutableList.of("dimE"), + ImmutableList.of("1") + ), + Suppliers.ofInstance(committer) + ); + plumber.add( + getTestInputRowFull( + "1970-01-01", + ImmutableList.of("dimA", "dimB", "dimC", "dimD", "dimE"), + ImmutableList.of("1") + ), + Suppliers.ofInstance(committer) + ); + + plumber.persist(committer); + + doneSignal.await(); + + plumber.getSinks().clear(); + plumber.finishJob(); + + RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + schema2, + tuningConfig, + metrics + ); + restoredPlumber.bootstrapSinksFromDisk(); + + sinks = restoredPlumber.getSinks(); + Assert.assertEquals(1, sinks.size()); + List hydrants = Lists.newArrayList(sinks.get(0L)); + + for (int i = 0; i < hydrants.size(); i++) { + hydrant = hydrants.get(i); + qindex = hydrant.getSegment().asQueryableIndex(); + Assert.assertEquals(i, hydrant.getCount()); + Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions())); + } + } + private InputRow getTestInputRow(final String timeStr) { return new InputRow() @@ -492,4 +610,58 @@ public class RealtimePlumberSchoolTest }; } + private InputRow getTestInputRowFull(final String timeStr, final List dims, final List dimVals) + { + return new InputRow() + { + @Override + public List getDimensions() + { + return dims; + } + + @Override + public long getTimestampFromEpoch() + { + return new DateTime(timeStr).getMillis(); + } + + @Override + public DateTime getTimestamp() + { + return new DateTime(timeStr); + } + + @Override + public List getDimension(String dimension) + { + return dimVals; + } + + @Override + public float getFloatMetric(String metric) + { + return 0; + } + + @Override + public long getLongMetric(String metric) + { + return 0L; + } + + @Override + public Object getRaw(String dimension) + { + return null; + } + + @Override + public int compareTo(Row o) + { + return 0; + } + }; + } + }