Preserve dimension order across indexes during ingestion

This commit is contained in:
jon-wei 2016-01-19 13:34:11 -08:00
parent df2906a91c
commit 747343e621
12 changed files with 1321 additions and 130 deletions

View File

@ -69,6 +69,8 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -199,7 +201,8 @@ public class IndexGeneratorJob implements Jobby
private static IncrementalIndex makeIncrementalIndex( private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket, Bucket theBucket,
AggregatorFactory[] aggs, AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config HadoopDruidIndexerConfig config,
Iterable<String> oldDimOrder
) )
{ {
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
@ -210,10 +213,16 @@ public class IndexGeneratorJob implements Jobby
.withMetrics(aggs) .withMetrics(aggs)
.build(); .build();
return new OnheapIncrementalIndex( OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
indexSchema, indexSchema,
tuningConfig.getRowFlushBoundary() tuningConfig.getRowFlushBoundary()
); );
if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder);
}
return newIndex;
} }
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable> public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
@ -310,9 +319,10 @@ public class IndexGeneratorJob implements Jobby
BytesWritable first = iter.next(); BytesWritable first = iter.next();
if (iter.hasNext()) { if (iter.hasNext()) {
LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config); IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));
while (iter.hasNext()) { while (iter.hasNext()) {
@ -320,9 +330,10 @@ public class IndexGeneratorJob implements Jobby
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);
if (!index.canAppendRow()) { if (!index.canAppendRow()) {
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context); flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config); index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder);
} }
index.add(value); index.add(value);
@ -523,7 +534,8 @@ public class IndexGeneratorJob implements Jobby
IncrementalIndex index = makeIncrementalIndex( IncrementalIndex index = makeIncrementalIndex(
bucket, bucket,
combiningAggs, combiningAggs,
config config,
null
); );
try { try {
File baseFlushFile = File.createTempFile("base", "flush"); File baseFlushFile = File.createTempFile("base", "flush");
@ -536,19 +548,20 @@ public class IndexGeneratorJob implements Jobby
int runningTotalLineCount = 0; int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
Set<String> allDimensionNames = Sets.newHashSet(); Set<String> allDimensionNames = Sets.newLinkedHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context); final ProgressIndicator progressIndicator = makeProgressIndicator(context);
for (final BytesWritable bw : values) { for (final BytesWritable bw : values) {
context.progress(); context.progress();
final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators)); final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow); int numRows = index.add(inputRow);
++lineCount; ++lineCount;
if (!index.canAppendRow()) { if (!index.canAppendRow()) {
allDimensionNames.addAll(index.getDimensionOrder());
log.info(index.getOutOfRowsReason()); log.info(index.getOutOfRowsReason());
log.info( log.info(
"%,d lines to %,d rows in %,d millis", "%,d lines to %,d rows in %,d millis",
@ -569,13 +582,16 @@ public class IndexGeneratorJob implements Jobby
index = makeIncrementalIndex( index = makeIncrementalIndex(
bucket, bucket,
combiningAggs, combiningAggs,
config config,
allDimensionNames
); );
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
++indexCount; ++indexCount;
} }
} }
allDimensionNames.addAll(index.getDimensionOrder());
log.info("%,d lines completed.", lineCount); log.info("%,d lines completed.", lineCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount); List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);

View File

@ -30,10 +30,12 @@ import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec; import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -76,9 +78,18 @@ import java.util.Map;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class IndexGeneratorJobTest public class IndexGeneratorJobTest
{ {
final private static AggregatorFactory[] aggs1 = {
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
};
@Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " + final private static AggregatorFactory[] aggs2 = {
"inputFormatName={4}, buildV9Directly={5}") new CountAggregatorFactory("count")
};
@Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " +
"data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " +
"aggs={8}, datasourceName={9}, buildV9Directly={10}")
public static Collection<Object[]> constructFeed() public static Collection<Object[]> constructFeed()
{ {
final List<Object[]> baseConstructors = Arrays.asList( final List<Object[]> baseConstructors = Arrays.asList(
@ -133,7 +144,10 @@ public class IndexGeneratorJobTest
null, null,
ImmutableList.of("timestamp", "host", "visited_num") ImmutableList.of("timestamp", "host", "visited_num")
) )
) ),
null,
aggs1,
"website"
}, },
{ {
false, false,
@ -175,7 +189,10 @@ public class IndexGeneratorJobTest
null, null,
ImmutableList.of("timestamp", "host", "visited_num") ImmutableList.of("timestamp", "host", "visited_num")
) )
) ),
null,
aggs1,
"website"
}, },
{ {
true, true,
@ -217,7 +234,10 @@ public class IndexGeneratorJobTest
null, null,
ImmutableList.of("timestamp", "host", "visited_num") ImmutableList.of("timestamp", "host", "visited_num")
) )
) ),
null,
aggs1,
"website"
}, },
{ {
false, false,
@ -269,7 +289,68 @@ public class IndexGeneratorJobTest
null, null,
ImmutableList.of("timestamp", "host", "visited_num") ImmutableList.of("timestamp", "host", "visited_num")
) )
) ),
null,
aggs1,
"website"
},
{
// Tests that new indexes inherit the dimension order from previous index
false,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
{
{0, 1} // use a single partition, dimension order inheritance is not supported across partitions
}
},
ImmutableList.of(
"{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}",
"{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}",
"{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}",
"{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}",
"{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}",
"{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}"
),
null,
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null)
)
),
1, // force 1 row max per index for easier testing
aggs2,
"inherit_dims"
},
{
// Tests that pre-specified dim order is maintained across indexes.
false,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
{
{0, 1}
}
},
ImmutableList.of(
"{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}",
"{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}",
"{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}",
"{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}",
"{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}",
"{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}"
),
null,
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), null, null)
)
),
1, // force 1 row max per index for easier testing
aggs2,
"inherit_dims2"
} }
} }
); );
@ -300,6 +381,9 @@ public class IndexGeneratorJobTest
private final List<String> data; private final List<String> data;
private final String inputFormatName; private final String inputFormatName;
private final InputRowParser inputRowParser; private final InputRowParser inputRowParser;
private final Integer maxRowsInMemory;
private final AggregatorFactory[] aggs;
private final String datasourceName;
private final boolean buildV9Directly; private final boolean buildV9Directly;
private ObjectMapper mapper; private ObjectMapper mapper;
@ -315,8 +399,11 @@ public class IndexGeneratorJobTest
List<String> data, List<String> data,
String inputFormatName, String inputFormatName,
InputRowParser inputRowParser, InputRowParser inputRowParser,
Integer maxRowsInMemory,
AggregatorFactory[] aggs,
String datasourceName,
boolean buildV9Directly boolean buildV9Directly
) throws IOException ) throws IOException
{ {
this.useCombiner = useCombiner; this.useCombiner = useCombiner;
this.partitionType = partitionType; this.partitionType = partitionType;
@ -325,6 +412,9 @@ public class IndexGeneratorJobTest
this.data = data; this.data = data;
this.inputFormatName = inputFormatName; this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser; this.inputRowParser = inputRowParser;
this.maxRowsInMemory = maxRowsInMemory;
this.aggs = aggs;
this.datasourceName = datasourceName;
this.buildV9Directly = buildV9Directly; this.buildV9Directly = buildV9Directly;
} }
@ -381,15 +471,12 @@ public class IndexGeneratorJobTest
config = new HadoopDruidIndexerConfig( config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec( new HadoopIngestionSpec(
new DataSchema( new DataSchema(
"website", datasourceName,
mapper.convertValue( mapper.convertValue(
inputRowParser, inputRowParser,
Map.class Map.class
), ),
new AggregatorFactory[]{ aggs,
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
new UniformGranularitySpec( new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
), ),
@ -406,7 +493,7 @@ public class IndexGeneratorJobTest
null, null,
null, null,
null, null,
null, maxRowsInMemory,
false, false,
false, false,
false, false,
@ -500,15 +587,29 @@ public class IndexGeneratorJobTest
Assert.assertTrue(indexZip.exists()); Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class); DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion()); Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval()); Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
if (datasourceName.equals("website")) {
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
} else if (datasourceName.equals("inherit_dims")) {
Assert.assertEquals("inherit_dims", dataSegment.getDataSource());
Assert.assertEquals(ImmutableList.of("X", "Y", "M", "Q", "B", "F"), dataSegment.getDimensions());
Assert.assertEquals("count", dataSegment.getMetrics().get(0));
} else if (datasourceName.equals("inherit_dims2")) {
Assert.assertEquals("inherit_dims2", dataSegment.getDataSource());
Assert.assertEquals(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), dataSegment.getDimensions());
Assert.assertEquals("count", dataSegment.getMetrics().get(0));
} else {
Assert.fail("Test did not specify supported datasource name");
}
if (partitionType.equals("hashed")) { if (partitionType.equals("hashed")) {
Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum]; Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();

View File

@ -610,6 +610,7 @@ public class IndexIO
continue; continue;
} }
int emptyStrIdx = dictionary.indexOf("");
List<Integer> singleValCol = null; List<Integer> singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension); GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
@ -626,7 +627,7 @@ public class IndexIO
if (rowValue.size() > 1) { if (rowValue.size() > 1) {
onlyOneValue = false; onlyOneValue = false;
} }
if (rowValue.size() == 0) { if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) {
if (nullsSet == null) { if (nullsSet == null) {
nullsSet = bitmapFactory.makeEmptyMutableBitmap(); nullsSet = bitmapFactory.makeEmptyMutableBitmap();
} }

View File

@ -71,6 +71,7 @@ import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedRTree; import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.TmpFileIOPeon; import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.data.VSizeIndexedWriter; import io.druid.segment.data.VSizeIndexedWriter;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
@ -104,6 +105,7 @@ public class IndexMerger
{ {
private static final Logger log = new Logger(IndexMerger.class); private static final Logger log = new Logger(IndexMerger.class);
protected static final ListIndexed EMPTY_STR_DIM_VAL = new ListIndexed<>(Arrays.asList(""), String.class);
protected static final SerializerUtils serializerUtils = new SerializerUtils(); protected static final SerializerUtils serializerUtils = new SerializerUtils();
protected static final int INVALID_ROW = -1; protected static final int INVALID_ROW = -1;
protected static final Splitter SPLITTER = Splitter.on(","); protected static final Splitter SPLITTER = Splitter.on(",");
@ -269,19 +271,53 @@ public class IndexMerger
); );
} }
private static List<String> getLongestSharedDimOrder(List<IndexableAdapter> indexes)
{
int maxSize = 0;
Iterable<String> orderingCandidate = null;
for (IndexableAdapter index : indexes) {
int iterSize = index.getDimensionNames().size();
if (iterSize > maxSize) {
maxSize = iterSize;
orderingCandidate = index.getDimensionNames();
}
}
if (orderingCandidate == null) {
return null;
}
for (IndexableAdapter index : indexes) {
Iterator<String> candidateIter = orderingCandidate.iterator();
for (String matchDim : index.getDimensionNames()) {
boolean matched = false;
while (candidateIter.hasNext()) {
String nextDim = candidateIter.next();
if (matchDim.equals(nextDim)) {
matched = true;
break;
}
}
if (!matched) {
return null;
}
}
}
return ImmutableList.copyOf(orderingCandidate);
}
public static List<String> getMergedDimensions(List<IndexableAdapter> indexes) public static List<String> getMergedDimensions(List<IndexableAdapter> indexes)
{ {
if (indexes.size() == 0) { if (indexes.size() == 0) {
return ImmutableList.of(); return ImmutableList.of();
} }
Indexed<String> dimOrder = indexes.get(0).getDimensionNames(); List<String> commonDimOrder = getLongestSharedDimOrder(indexes);
for (IndexableAdapter index : indexes) { if (commonDimOrder == null) {
Indexed<String> dimOrder2 = index.getDimensionNames(); log.warn("Indexes have incompatible dimension orders, using lexicographic order.");
if (!Iterators.elementsEqual(dimOrder.iterator(), dimOrder2.iterator())) { return getLexicographicMergedDimensions(indexes);
return getLexicographicMergedDimensions(indexes); } else {
} return commonDimOrder;
} }
return ImmutableList.copyOf(dimOrder);
} }
public File merge( public File merge(
@ -602,13 +638,35 @@ public class IndexMerger
); );
writer.open(); writer.open();
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size()); List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1);
DimValueConverter[] converters = new DimValueConverter[indexes.size()]; DimValueConverter[] converters = new DimValueConverter[indexes.size()];
boolean dimHasValues = false;
boolean[] dimHasValuesByIndex = new boolean[indexes.size()];
for (int i = 0; i < indexes.size(); i++) { for (int i = 0; i < indexes.size(); i++) {
Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension); Indexed<String> dimValues = indexes.get(i).getDimValueLookup(dimension);
if (!isNullColumn(dimValues)) { if (!isNullColumn(dimValues)) {
dimHasValues = true;
dimHasValuesByIndex[i] = true;
dimValueLookups.add(dimValues); dimValueLookups.add(dimValues);
converters[i] = new DimValueConverter(dimValues); converters[i] = new DimValueConverter(dimValues);
} else {
dimHasValuesByIndex[i] = false;
}
}
/*
* Ensure the empty str is always in the dictionary if column is not null across indexes
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with
* rows from indexes with partial null values for that dimension.
*/
if (dimHasValues) {
dimValueLookups.add(EMPTY_STR_DIM_VAL);
for (int i = 0; i < indexes.size(); i++) {
if (!dimHasValuesByIndex[i]) {
converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL);
}
} }
} }
@ -652,6 +710,7 @@ public class IndexMerger
++count; ++count;
} }
dimensionCardinalities.put(dimension, count); dimensionCardinalities.put(dimension, count);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
@ -725,8 +784,9 @@ public class IndexMerger
); );
} }
} }
) ),
, mergedDimensions, dimConversions.get(i), i mergedDimensions, dimConversions.get(i), i,
dimensionCardinalities
) )
); );
} }
@ -944,7 +1004,6 @@ public class IndexMerger
ByteStreams.copy(spatialWriter.combineStreams(), spatialOut); ByteStreams.copy(spatialWriter.combineStreams(), spatialOut);
spatialIoPeon.cleanup(); spatialIoPeon.cleanup();
} }
} }
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
@ -1161,18 +1220,22 @@ public class IndexMerger
private final List<String> convertedDims; private final List<String> convertedDims;
private final Map<String, IntBuffer> converters; private final Map<String, IntBuffer> converters;
private final int indexNumber; private final int indexNumber;
private final Map<String, Integer> dimCardinalities;
private static final int[] EMPTY_STR_DIM = new int[]{0};
MMappedIndexRowIterable( MMappedIndexRowIterable(
Iterable<Rowboat> index, Iterable<Rowboat> index,
List<String> convertedDims, List<String> convertedDims,
Map<String, IntBuffer> converters, Map<String, IntBuffer> converters,
int indexNumber int indexNumber,
Map<String, Integer> dimCardinalities
) )
{ {
this.index = index; this.index = index;
this.convertedDims = convertedDims; this.convertedDims = convertedDims;
this.converters = converters; this.converters = converters;
this.indexNumber = indexNumber; this.indexNumber = indexNumber;
this.dimCardinalities = dimCardinalities;
} }
public Iterable<Rowboat> getIndex() public Iterable<Rowboat> getIndex()
@ -1206,12 +1269,20 @@ public class IndexMerger
int[][] newDims = new int[convertedDims.size()][]; int[][] newDims = new int[convertedDims.size()][];
for (int i = 0; i < convertedDims.size(); ++i) { for (int i = 0; i < convertedDims.size(); ++i) {
IntBuffer converter = converterArray[i]; IntBuffer converter = converterArray[i];
String dimName = convertedDims.get(i);
if (converter == null) { if (converter == null) {
continue; continue;
} }
if (i >= dims.length || dims[i] == null) { if (i >= dims.length) {
continue;
}
if (dims[i] == null) {
if (dimCardinalities.get(dimName) > 0) {
newDims[i] = EMPTY_STR_DIM;
}
continue; continue;
} }

View File

@ -183,7 +183,7 @@ public class IndexMergerV9 extends IndexMerger
/************* Walk through data sets, merge them, and write merged columns *************/ /************* Walk through data sets, merge them, and write merged columns *************/
progress.progress(); progress.progress();
final Iterable<Rowboat> theRows = makeRowIterable( final Iterable<Rowboat> theRows = makeRowIterable(
adapters, mergedDimensions, mergedMetrics, dimConversions, rowMergerFn adapters, mergedDimensions, mergedMetrics, dimConversions, dimCardinalities, rowMergerFn
); );
final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon); final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon);
final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters( final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters(
@ -267,8 +267,8 @@ public class IndexMergerV9 extends IndexMerger
progress.startSection(section); progress.startSection(section);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
final Set<String> finalColumns = Sets.newTreeSet(); final Set<String> finalDimensions = Sets.newLinkedHashSet();
final Set<String> finalDimensions = Sets.newTreeSet(); final Set<String> finalColumns = Sets.newLinkedHashSet();
finalColumns.addAll(mergedMetrics); finalColumns.addAll(mergedMetrics);
for (int i = 0; i < mergedDimensions.size(); ++i) { for (int i = 0; i < mergedDimensions.size(); ++i) {
if (dimensionSkipFlag.get(i)) { if (dimensionSkipFlag.get(i)) {
@ -665,7 +665,7 @@ public class IndexMergerV9 extends IndexMerger
if (dimensionSkipFlag.get(i)) { if (dimensionSkipFlag.get(i)) {
continue; continue;
} }
if (dims[i] == null || dims[i].length == 0) { if (dims[i] == null || dims[i].length == 0 || (dims[i].length == 1 && dims[i][0] == 0)) {
nullRowsList.get(i).add(rowCount); nullRowsList.get(i).add(rowCount);
} }
dimWriters.get(i).add(dims[i]); dimWriters.get(i).add(dims[i]);
@ -778,6 +778,7 @@ public class IndexMergerV9 extends IndexMerger
final List<String> mergedDimensions, final List<String> mergedDimensions,
final List<String> mergedMetrics, final List<String> mergedMetrics,
final ArrayList<Map<String, IntBuffer>> dimConversions, final ArrayList<Map<String, IntBuffer>> dimConversions,
final Map<String, Integer> dimCardinalities,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
) )
{ {
@ -834,7 +835,8 @@ public class IndexMergerV9 extends IndexMerger
), ),
mergedDimensions, mergedDimensions,
dimConversions.get(i), dimConversions.get(i),
i i,
dimCardinalities
) )
); );
} }
@ -886,32 +888,39 @@ public class IndexMergerV9 extends IndexMerger
// each converter converts dim values of this dimension to global dictionary // each converter converts dim values of this dimension to global dictionary
DimValueConverter[] converters = new DimValueConverter[adapters.size()]; DimValueConverter[] converters = new DimValueConverter[adapters.size()];
boolean existNullColumn = false; boolean dimHasValues = false;
boolean[] dimHasValuesByIndex = new boolean[adapters.size()];
for (int i = 0; i < adapters.size(); i++) { for (int i = 0; i < adapters.size(); i++) {
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension); Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension);
if (!isNullColumn(dimValues)) { if (!isNullColumn(dimValues)) {
dimHasValues = true;
dimHasValuesByIndex[i] = true;
dimValueLookups.add(dimValues); dimValueLookups.add(dimValues);
converters[i] = new DimValueConverter(dimValues); converters[i] = new DimValueConverter(dimValues);
} else { } else {
existNullColumn = true; dimHasValuesByIndex[i] = false;
} }
} }
Iterable<Indexed<String>> bumpedDimValueLookups; /*
if (!dimValueLookups.isEmpty() && existNullColumn) { * Ensure the empty str is always in the dictionary if column is not null across indexes
log.info("dim[%s] are null in some indexes, append null value to dim values", dimension); * This is done so that MMappedIndexRowIterable can convert null columns to empty strings
bumpedDimValueLookups = Iterables.concat( * later on, to allow rows from indexes with no values at all for a dimension to merge correctly with
Arrays.asList(new ArrayIndexed<>(new String[]{null}, String.class)), * rows from indexes with partial null values for that dimension.
dimValueLookups */
); if (dimHasValues) {
} else { dimValueLookups.add(EMPTY_STR_DIM_VAL);
bumpedDimValueLookups = dimValueLookups; for (int i = 0; i < adapters.size(); i++) {
if (!dimHasValuesByIndex[i]) {
converters[i] = new DimValueConverter(EMPTY_STR_DIM_VAL);
}
}
} }
// sort all dimension values and treat all null values as empty strings // sort all dimension values and treat all null values as empty strings
Iterable<String> dimensionValues = CombiningIterable.createSplatted( Iterable<String> dimensionValues = CombiningIterable.createSplatted(
Iterables.transform( Iterables.transform(
bumpedDimValueLookups, dimValueLookups,
new Function<Indexed<String>, Iterable<String>>() new Function<Indexed<String>, Iterable<String>>()
{ {
@Override @Override

View File

@ -401,10 +401,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
/** /**
* Adds a new row. The row might correspond with another row that already exists, in which case this will * Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one. * update that row instead of inserting a new one.
* <p/> * <p>
* <p/> * <p>
* Calls to add() are thread safe. * Calls to add() are thread safe.
* <p/> * <p>
* *
* @param row the row of data to add * @param row the row of data to add
* *
@ -599,6 +599,36 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return dimSpec == null ? null : dimSpec.getIndex(); return dimSpec == null ? null : dimSpec.getIndex();
} }
public List<String> getDimensionOrder()
{
synchronized (dimensionDescs) {
return ImmutableList.copyOf(dimensionDescs.keySet());
}
}
/*
* Currently called to initialize IncrementalIndex dimension order during index creation
* Index dimension ordering could be changed to initalize from DimensionsSpec after resolution of
* https://github.com/druid-io/druid/issues/2011
*/
public void loadDimensionIterable(Iterable<String> oldDimensionOrder)
{
synchronized (dimensionDescs) {
if (!dimensionDescs.isEmpty()) {
throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionDescs.keySet());
}
for (String dim : oldDimensionOrder) {
if (dimensionDescs.get(dim) == null) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dim, capabilities);
DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities);
dimensionDescs.put(dim, desc);
}
}
}
}
public List<String> getMetricNames() public List<String> getMetricNames()
{ {
return ImmutableList.copyOf(metricDescs.keySet()); return ImmutableList.copyOf(metricDescs.keySet());
@ -903,13 +933,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public int compareTo(TimeAndDims rhs) public int compareTo(TimeAndDims rhs)
{ {
int retVal = Longs.compare(timestamp, rhs.timestamp); int retVal = Longs.compare(timestamp, rhs.timestamp);
int numComparisons = Math.min(dims.length, rhs.dims.length);
if (retVal == 0) {
retVal = Ints.compare(dims.length, rhs.dims.length);
}
int index = 0; int index = 0;
while (retVal == 0 && index < dims.length) { while (retVal == 0 && index < numComparisons) {
String[] lhsVals = dims[index]; String[] lhsVals = dims[index];
String[] rhsVals = rhs.dims[index]; String[] rhsVals = rhs.dims[index];
@ -935,6 +962,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
++index; ++index;
} }
if (retVal == 0) {
return Ints.compare(dims.length, rhs.dims.length);
}
return retVal; return retVal;
} }

View File

@ -104,7 +104,7 @@ public class SegmentMetadataQueryTest
new ColumnAnalysis( new ColumnAnalysis(
ValueType.STRING.toString(), ValueType.STRING.toString(),
10881, 10881,
1, 2,
null null
) )
), 71982, ), 71982,
@ -135,7 +135,7 @@ public class SegmentMetadataQueryTest
new ColumnAnalysis( new ColumnAnalysis(
ValueType.STRING.toString(), ValueType.STRING.toString(),
21762, 21762,
1, 2,
null null
) )
), ),

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
@ -41,7 +42,9 @@ import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -59,9 +62,11 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@Deprecated
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class IndexMergerTest public class IndexMergerTest
{ {
// Deprecated, use IndexMergerV9Test instead
@Rule @Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder(); public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -202,8 +207,8 @@ public class IndexMergerTest
} }
Assert.assertEquals(2, boatList.size()); Assert.assertEquals(2, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims());
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", "")); checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1")); checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1"));
@ -445,7 +450,7 @@ public class IndexMergerTest
final QueryableIndex index2 = closer.closeLater( final QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex( INDEX_IO.loadIndex(
INDEX_MERGER.persist( INDEX_MERGER.persist(
toPersist1, toPersist2,
tmpDir2, tmpDir2,
indexSpec indexSpec
) )
@ -468,7 +473,7 @@ public class IndexMergerTest
Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
@ -493,8 +498,8 @@ public class IndexMergerTest
); );
QueryableIndex index1 = closer.closeLater( QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex( INDEX_IO.loadIndex(
INDEX_MERGER.append( INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(incrementalAdapter), null, tempDir1, indexSpec ImmutableList.<IndexableAdapter>of(incrementalAdapter), null, tempDir1, indexSpec
) )
) )
@ -830,22 +835,111 @@ public class IndexMergerTest
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(3, boatList.size()); Assert.assertEquals(3, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics()); Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());
} }
@Test
public void testMergeWithDimensionsList() throws Exception
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(Arrays.asList("dimA", "dimB", "dimC"), null, null))
.withMinTimestamp(0L)
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.build();
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, 1000);
addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist3, "dimC", Arrays.asList("1", "2"));
final File tmpDir = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist1,
tmpDir,
indexSpec
)
)
);
QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist2,
tmpDir2,
indexSpec
)
)
);
QueryableIndex index3 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist3,
tmpDir3,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2, index3),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(ImmutableList.of("dimA", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics());
}
@Test @Test
public void testDisjointDimMerge() throws Exception public void testDisjointDimMerge() throws Exception
{ {
IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
IncrementalIndex toPersistB2 = getIndexWithDims(Arrays.asList("dimA", "dimB"));
addDimValuesToIndex(toPersistB2, "dimB", Arrays.asList("1", "2", "3"));
final File tmpDirA = temporaryFolder.newFolder(); final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder(); final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirB2 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder(); final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater( QueryableIndex indexA = closer.closeLater(
@ -868,6 +962,16 @@ public class IndexMergerTest
) )
); );
QueryableIndex indexB2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB2,
tmpDirB2,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater( final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex( INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex( INDEX_MERGER.mergeQueryableIndex(
@ -879,12 +983,22 @@ public class IndexMergerTest
) )
); );
final QueryableIndex merged2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows(); List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) { final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
boatList.add(boat); List<Rowboat> boatList2 = ImmutableList.copyOf(adapter2.getRows());
}
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size()); Assert.assertEquals(5, boatList.size());
@ -899,6 +1013,19 @@ public class IndexMergerTest
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(4).getDims()); Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(4).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(4).getMetrics()); Assert.assertArrayEquals(new Object[]{1L}, boatList.get(4).getMetrics());
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter2.getDimensionNames()));
Assert.assertEquals(5, boatList2.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList2.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList2.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList2.get(2).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList2.get(3).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList2.get(4).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics());
checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("dimA", "")); checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter.getBitmapIndex("dimA", ""));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimA", "1")); checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimA", "1"));
checkBitmapIndex(Lists.newArrayList(4), adapter.getBitmapIndex("dimA", "2")); checkBitmapIndex(Lists.newArrayList(4), adapter.getBitmapIndex("dimA", "2"));
@ -1017,10 +1144,10 @@ public class IndexMergerTest
ImmutableList.copyOf(adapter.getDimensionNames()) ImmutableList.copyOf(adapter.getDimensionNames())
); );
Assert.assertEquals(4, boatList.size()); Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims());
checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", "")); checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210"));
@ -1063,6 +1190,174 @@ public class IndexMergerTest
} }
} }
@Test
public void testMergeWithSupersetOrdering() throws Exception
{
IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
toPersistBA2.add(
new MapBasedInputRow(
1,
Arrays.asList("dimB", "dimA"),
ImmutableMap.<String, Object>of("dimB", "1", "dimA", "")
)
);
toPersistBA2.add(
new MapBasedInputRow(
1,
Arrays.asList("dimB", "dimA"),
ImmutableMap.<String, Object>of("dimB", "", "dimA", "1")
)
);
IncrementalIndex toPersistC = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersistC, "dimC", Arrays.asList("1", "2", "3"));
final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirBA = temporaryFolder.newFolder();
final File tmpDirBA2 = temporaryFolder.newFolder();
final File tmpDirC = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
final File tmpDirMerged2 = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
indexSpec
)
)
);
QueryableIndex indexB = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
indexSpec
)
)
);
QueryableIndex indexBA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistBA,
tmpDirBA,
indexSpec
)
)
);
QueryableIndex indexBA2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistBA2,
tmpDirBA2,
indexSpec
)
)
);
QueryableIndex indexC = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistC,
tmpDirC,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB, indexBA, indexBA2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final QueryableIndex merged2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB, indexBA, indexC),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged2,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = ImmutableList.copyOf(boats);
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
Iterable<Rowboat> boats2 = adapter2.getRows();
List<Rowboat> boatList2 = ImmutableList.copyOf(boats2);
Assert.assertEquals(ImmutableList.of("dimB", "dimA"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{3}, {0}}, boatList.get(4).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(4).getMetrics());
Assert.assertEquals(ImmutableList.of("dimA", "dimB", "dimC"), ImmutableList.copyOf(adapter2.getDimensionNames()));
Assert.assertEquals(12, boatList2.size());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}}, boatList2.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList2.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {3}}, boatList2.get(2).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(3).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(4).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(5).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(5).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}}, boatList2.get(6).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList2.get(6).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(7).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(7).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(8).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(8).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(9).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(9).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(10).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(10).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(11).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList2.get(11).getMetrics());
}
private IncrementalIndex getIndexD3() throws Exception private IncrementalIndex getIndexD3() throws Exception
{ {
IncrementalIndex toPersist1 = new OnheapIncrementalIndex( IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
@ -1108,8 +1403,14 @@ public class IndexMergerTest
1000 1000
); );
addDimValuesToIndex(toPersist1, dimName, values);
return toPersist1;
}
private void addDimValuesToIndex(IncrementalIndex index, String dimName, List<String> values) throws Exception
{
for (String val : values) { for (String val : values) {
toPersist1.add( index.add(
new MapBasedInputRow( new MapBasedInputRow(
1, 1,
Arrays.asList(dimName), Arrays.asList(dimName),
@ -1117,8 +1418,21 @@ public class IndexMergerTest
) )
); );
} }
}
return toPersist1; private IncrementalIndex getIndexWithDims(List<String> dims)
{
IncrementalIndexSchema schema = new IncrementalIndexSchema(
0L,
QueryGranularity.NONE,
new DimensionsSpec(dims, null, null),
new AggregatorFactory[]{new CountAggregatorFactory("count")}
);
return new OnheapIncrementalIndex(
schema,
1000
);
} }
private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
@ -41,6 +42,7 @@ import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
@ -64,16 +66,22 @@ public class IndexMergerV9Test
@Rule @Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder(); public final TemporaryFolder temporaryFolder = new TemporaryFolder();
protected final static IndexMergerV9 INDEX_MERGER = TestHelper.getTestIndexMergerV9(); protected IndexMerger INDEX_MERGER;
private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO(); private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO();
@Parameterized.Parameters(name = "{index}: bitmap={0}, metric compression={1}, dimension compression={2}") @Parameterized.Parameters(name = "{index}: useV9={0}, bitmap={1}, metric compression={2}, dimension compression={3}")
public static Collection<Object[]> data() public static Collection<Object[]> data()
{ {
return Collections2.transform( return Collections2.transform(
Sets.cartesianProduct( Sets.cartesianProduct(
ImmutableList.of( ImmutableList.of(
ImmutableSet.of(new RoaringBitmapSerdeFactory(), new ConciseBitmapSerdeFactory()), ImmutableSet.of(
true,
false
),
ImmutableSet.of(
new RoaringBitmapSerdeFactory(),
new ConciseBitmapSerdeFactory()),
ImmutableSet.of( ImmutableSet.of(
CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED, CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED,
CompressedObjectStrategy.CompressionStrategy.LZ4, CompressedObjectStrategy.CompressionStrategy.LZ4,
@ -119,12 +127,18 @@ public class IndexMergerV9Test
public final CloserRule closer = new CloserRule(false); public final CloserRule closer = new CloserRule(false);
public IndexMergerV9Test( public IndexMergerV9Test(
boolean useV9,
BitmapSerdeFactory bitmapSerdeFactory, BitmapSerdeFactory bitmapSerdeFactory,
CompressedObjectStrategy.CompressionStrategy compressionStrategy, CompressedObjectStrategy.CompressionStrategy compressionStrategy,
CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy CompressedObjectStrategy.CompressionStrategy dimCompressionStrategy
) )
{ {
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy); this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy, dimCompressionStrategy);
if (useV9) {
INDEX_MERGER = TestHelper.getTestIndexMergerV9();
} else {
INDEX_MERGER = TestHelper.getTestIndexMerger();
}
} }
@Test @Test
@ -196,8 +210,8 @@ public class IndexMergerV9Test
} }
Assert.assertEquals(2, boatList.size()); Assert.assertEquals(2, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims());
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", "")); checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1")); checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1"));
@ -364,7 +378,7 @@ public class IndexMergerV9Test
final QueryableIndex index2 = closer.closeLater( final QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex( INDEX_IO.loadIndex(
INDEX_MERGER.persist( INDEX_MERGER.persist(
toPersist1, toPersist2,
tmpDir2, tmpDir2,
indexSpec indexSpec
) )
@ -387,7 +401,7 @@ public class IndexMergerV9Test
Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(1, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions())); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(index2.getAvailableDimensions()));
Assert.assertEquals(1, merged.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(2, merged.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions())); Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
@ -452,7 +466,6 @@ public class IndexMergerV9Test
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
} }
@Test @Test
public void testAppendRetainsValues() throws Exception public void testAppendRetainsValues() throws Exception
{ {
@ -716,6 +729,7 @@ public class IndexMergerV9Test
compressedSupplierField.setAccessible(true); compressedSupplierField.setAccessible(true);
Object supplier = compressedSupplierField.get(obj); Object supplier = compressedSupplierField.get(obj);
Field compressionField = supplier.getClass().getDeclaredField("compression"); Field compressionField = supplier.getClass().getDeclaredField("compression");
compressionField.setAccessible(true); compressionField.setAccessible(true);
@ -724,14 +738,201 @@ public class IndexMergerV9Test
Assert.assertEquals(expectedStrategy, strategy); Assert.assertEquals(expectedStrategy, strategy);
} }
@Test
public void testNonLexicographicDimOrderMerge() throws Exception
{
IncrementalIndex toPersist1 = getIndexD3();
IncrementalIndex toPersist2 = getIndexD3();
IncrementalIndex toPersist3 = getIndexD3();
final File tmpDir = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist1,
tmpDir,
indexSpec
)
)
);
QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist2,
tmpDir2,
indexSpec
)
)
);
QueryableIndex index3 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist3,
tmpDir3,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2, index3),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(3, boatList.size());
Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d3", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d3", "30000"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d3", "40000"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d3", "50000"));
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d1", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d1", "100"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d1", "200"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d1", "300"));
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d2", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "2000"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("d2", "3000"));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("d2", "4000"));
}
@Test
public void testMergeWithDimensionsList() throws Exception
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(Arrays.asList("dimA", "dimB", "dimC"), null, null))
.withMinTimestamp(0L)
.withQueryGranularity(QueryGranularity.NONE)
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.build();
IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, 1000);
IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, 1000);
addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist3, "dimC", Arrays.asList("1", "2"));
final File tmpDir = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDir3 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist1,
tmpDir,
indexSpec
)
)
);
QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist2,
tmpDir2,
indexSpec
)
)
);
QueryableIndex index3 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist3,
tmpDir3,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2, index3),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) {
boatList.add(boat);
}
Assert.assertEquals(ImmutableList.of("dimA", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics());
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dimA", ""));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimA", "1"));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimA", "2"));
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dimB", ""));
checkBitmapIndex(Lists.newArrayList(2, 3), adapter.getBitmapIndex("dimC", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimC", "1"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimC", "2"));
}
@Test @Test
public void testDisjointDimMerge() throws Exception public void testDisjointDimMerge() throws Exception
{ {
IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2")); IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
IncrementalIndex toPersistB2 = getIndexWithDims(Arrays.asList("dimA", "dimB"));
addDimValuesToIndex(toPersistB2, "dimB", Arrays.asList("1", "2", "3"));
final File tmpDirA = temporaryFolder.newFolder(); final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder(); final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirB2 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder(); final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater( QueryableIndex indexA = closer.closeLater(
@ -754,6 +955,16 @@ public class IndexMergerV9Test
) )
); );
QueryableIndex indexB2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB2,
tmpDirB2,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater( final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex( INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex( INDEX_MERGER.mergeQueryableIndex(
@ -765,12 +976,22 @@ public class IndexMergerV9Test
) )
); );
final QueryableIndex merged2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows(); List<Rowboat> boatList = ImmutableList.copyOf(adapter.getRows());
List<Rowboat> boatList = new ArrayList<>();
for (Rowboat boat : boats) { final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
boatList.add(boat); List<Rowboat> boatList2 = ImmutableList.copyOf(adapter2.getRows());
}
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames())); Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size()); Assert.assertEquals(5, boatList.size());
@ -793,6 +1014,28 @@ public class IndexMergerV9Test
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimB", "1")); checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimB", "1"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimB", "2")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimB", "2"));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimB", "3")); checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimB", "3"));
Assert.assertEquals(ImmutableList.of("dimA", "dimB"), ImmutableList.copyOf(adapter2.getDimensionNames()));
Assert.assertEquals(5, boatList2.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList2.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList2.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}}, boatList2.get(2).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList2.get(3).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList2.get(4).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics());
checkBitmapIndex(Lists.newArrayList(0, 1, 2), adapter2.getBitmapIndex("dimA", ""));
checkBitmapIndex(Lists.newArrayList(3), adapter2.getBitmapIndex("dimA", "1"));
checkBitmapIndex(Lists.newArrayList(4), adapter2.getBitmapIndex("dimA", "2"));
checkBitmapIndex(Lists.newArrayList(3, 4), adapter2.getBitmapIndex("dimB", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter2.getBitmapIndex("dimB", "1"));
checkBitmapIndex(Lists.newArrayList(1), adapter2.getBitmapIndex("dimB", "2"));
checkBitmapIndex(Lists.newArrayList(2), adapter2.getBitmapIndex("dimB", "3"));
} }
@Test @Test
@ -903,10 +1146,10 @@ public class IndexMergerV9Test
ImmutableList.copyOf(adapter.getDimensionNames()) ImmutableList.copyOf(adapter.getDimensionNames())
); );
Assert.assertEquals(4, boatList.size()); Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims()); Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims()); Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims());
checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", "")); checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210")); checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210"));
@ -949,6 +1192,199 @@ public class IndexMergerV9Test
} }
} }
@Test
public void testMergeWithSupersetOrdering() throws Exception
{
IncrementalIndex toPersistA = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistB = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2"));
IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
1000
);
toPersistBA2.add(
new MapBasedInputRow(
1,
Arrays.asList("dimB", "dimA"),
ImmutableMap.<String, Object>of("dimB", "1", "dimA", "")
)
);
toPersistBA2.add(
new MapBasedInputRow(
1,
Arrays.asList("dimB", "dimA"),
ImmutableMap.<String, Object>of("dimB", "", "dimA", "1")
)
);
IncrementalIndex toPersistC = getSingleDimIndex("dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersistC, "dimC", Arrays.asList("1", "2", "3"));
final File tmpDirA = temporaryFolder.newFolder();
final File tmpDirB = temporaryFolder.newFolder();
final File tmpDirBA = temporaryFolder.newFolder();
final File tmpDirBA2 = temporaryFolder.newFolder();
final File tmpDirC = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
final File tmpDirMerged2 = temporaryFolder.newFolder();
QueryableIndex indexA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
indexSpec
)
)
);
QueryableIndex indexB = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
indexSpec
)
)
);
QueryableIndex indexBA = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistBA,
tmpDirBA,
indexSpec
)
)
);
QueryableIndex indexBA2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistBA2,
tmpDirBA2,
indexSpec
)
)
);
QueryableIndex indexC = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersistC,
tmpDirC,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB, indexBA, indexBA2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final QueryableIndex merged2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(indexA, indexB, indexBA, indexC),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged2,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = ImmutableList.copyOf(boats);
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(merged2);
Iterable<Rowboat> boats2 = adapter2.getRows();
List<Rowboat> boatList2 = ImmutableList.copyOf(boats2);
Assert.assertEquals(ImmutableList.of("dimB", "dimA"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(5, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{3}, {0}}, boatList.get(4).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(4).getMetrics());
checkBitmapIndex(Lists.newArrayList(2, 3, 4), adapter.getBitmapIndex("dimA", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dimA", "1"));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("dimA", "2"));
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dimB", ""));
checkBitmapIndex(Lists.newArrayList(2), adapter.getBitmapIndex("dimB", "1"));
checkBitmapIndex(Lists.newArrayList(3), adapter.getBitmapIndex("dimB", "2"));
checkBitmapIndex(Lists.newArrayList(4), adapter.getBitmapIndex("dimB", "3"));
Assert.assertEquals(ImmutableList.of("dimA", "dimB", "dimC"), ImmutableList.copyOf(adapter2.getDimensionNames()));
Assert.assertEquals(12, boatList2.size());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}}, boatList2.get(0).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList2.get(1).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {3}}, boatList2.get(2).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(2).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(3).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(3).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(4).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(4).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(5).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(5).getMetrics());
Assert.assertArrayEquals(new int[][]{{1}, {0}, {0}}, boatList2.get(6).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList2.get(6).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(7).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(7).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}}, boatList2.get(8).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(8).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {2}, {0}}, boatList2.get(9).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(9).getMetrics());
Assert.assertArrayEquals(new int[][]{{0}, {3}, {0}}, boatList2.get(10).getDims());
Assert.assertArrayEquals(new Object[]{1L}, boatList2.get(10).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {0}, {0}}, boatList2.get(11).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList2.get(11).getMetrics());
checkBitmapIndex(Lists.newArrayList(0, 1, 2, 3, 4, 5, 8, 9, 10), adapter2.getBitmapIndex("dimA", ""));
checkBitmapIndex(Lists.newArrayList(6), adapter2.getBitmapIndex("dimA", "1"));
checkBitmapIndex(Lists.newArrayList(7, 11), adapter2.getBitmapIndex("dimA", "2"));
checkBitmapIndex(Lists.newArrayList(0, 1, 2, 6, 7, 11), adapter2.getBitmapIndex("dimB", ""));
checkBitmapIndex(Lists.newArrayList(3, 8), adapter2.getBitmapIndex("dimB", "1"));
checkBitmapIndex(Lists.newArrayList(4, 9), adapter2.getBitmapIndex("dimB", "2"));
checkBitmapIndex(Lists.newArrayList(5, 10), adapter2.getBitmapIndex("dimB", "3"));
checkBitmapIndex(Lists.newArrayList(3, 4, 5, 6, 7, 8, 9, 10, 11), adapter2.getBitmapIndex("dimC", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter2.getBitmapIndex("dimC", "1"));
checkBitmapIndex(Lists.newArrayList(1), adapter2.getBitmapIndex("dimC", "2"));
checkBitmapIndex(Lists.newArrayList(2), adapter2.getBitmapIndex("dimC", "3"));
}
private IncrementalIndex getIndexD3() throws Exception private IncrementalIndex getIndexD3() throws Exception
{ {
IncrementalIndex toPersist1 = new OnheapIncrementalIndex( IncrementalIndex toPersist1 = new OnheapIncrementalIndex(
@ -960,7 +1396,7 @@ public class IndexMergerV9Test
toPersist1.add( toPersist1.add(
new MapBasedInputRow( new MapBasedInputRow(
0, 1,
Arrays.asList("d3", "d1", "d2"), Arrays.asList("d3", "d1", "d2"),
ImmutableMap.<String, Object>of("d1", "100", "d2", "4000", "d3", "30000") ImmutableMap.<String, Object>of("d1", "100", "d2", "4000", "d3", "30000")
) )
@ -968,17 +1404,17 @@ public class IndexMergerV9Test
toPersist1.add( toPersist1.add(
new MapBasedInputRow( new MapBasedInputRow(
0, 1,
Arrays.asList("d3", "d1", "d2"), Arrays.asList("d3", "d1", "d2"),
ImmutableMap.<String, Object>of("d1", "200", "d2", "3000", "d3", "50000") ImmutableMap.<String, Object>of("d1", "300", "d2", "2000", "d3", "40000")
) )
); );
toPersist1.add( toPersist1.add(
new MapBasedInputRow( new MapBasedInputRow(
0, 1,
Arrays.asList("d3", "d1", "d2"), Arrays.asList("d3", "d1", "d2"),
ImmutableMap.<String, Object>of("d1", "300", "d2", "2000", "d3", "40000") ImmutableMap.<String, Object>of("d1", "200", "d2", "3000", "d3", "50000")
) )
); );
@ -994,8 +1430,14 @@ public class IndexMergerV9Test
1000 1000
); );
addDimValuesToIndex(toPersist1, dimName, values);
return toPersist1;
}
private void addDimValuesToIndex(IncrementalIndex index, String dimName, List<String> values) throws Exception
{
for (String val : values) { for (String val : values) {
toPersist1.add( index.add(
new MapBasedInputRow( new MapBasedInputRow(
1, 1,
Arrays.asList(dimName), Arrays.asList(dimName),
@ -1003,7 +1445,21 @@ public class IndexMergerV9Test
) )
); );
} }
return toPersist1;
} }
private IncrementalIndex getIndexWithDims(List<String> dims)
{
IncrementalIndexSchema schema = new IncrementalIndexSchema(
0L,
QueryGranularity.NONE,
new DimensionsSpec(dims, null, null),
new AggregatorFactory[]{new CountAggregatorFactory("count")}
);
return new OnheapIncrementalIndex(
schema,
1000
);
}
} }

View File

@ -74,6 +74,8 @@ public class QueryableIndexIndexableAdapterTest {
IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1"); BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1");
IndexedInts indexedIntsNull = bitmapIndexSeeker.seek(null);
Assert.assertEquals(0, indexedIntsNull.size());
IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0"); IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0");
Assert.assertEquals(0, indexedInts0.size()); Assert.assertEquals(0, indexedInts0.size());
IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1"); IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1");

View File

@ -24,10 +24,13 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
@ -41,6 +44,8 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -55,6 +60,7 @@ public class Sink implements Iterable<FireHydrant>
private final RealtimeTuningConfig config; private final RealtimeTuningConfig config;
private final String version; private final String version;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>(); private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
private final LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
private volatile FireHydrant currHydrant; private volatile FireHydrant currHydrant;
public Sink( public Sink(
@ -204,6 +210,18 @@ public class Sink implements Iterable<FireHydrant>
if (numHydrants > 0) { if (numHydrants > 0) {
FireHydrant lastHydrant = hydrants.get(numHydrants - 1); FireHydrant lastHydrant = hydrants.get(numHydrants - 1);
newCount = lastHydrant.getCount() + 1; newCount = lastHydrant.getCount() + 1;
if (!indexSchema.getDimensionsSpec().hasCustomDimensions()) {
if (lastHydrant.hasSwapped()) {
QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex();
for (String dim : oldIndex.getAvailableDimensions()) {
dimOrder.add(dim);
}
} else {
IncrementalIndex oldIndex = lastHydrant.getIndex();
dimOrder.addAll(oldIndex.getDimensionOrder());
}
newIndex.loadDimensionIterable(dimOrder);
}
} }
currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier()); currHydrant = new FireHydrant(newIndex, newCount, getSegment().getIdentifier());
hydrants.add(currHydrant); hydrants.add(currHydrant);

View File

@ -21,6 +21,7 @@ package io.druid.segment.realtime.plumber;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -43,6 +44,7 @@ import io.druid.query.QueryRunnerFactory;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
@ -71,9 +73,9 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
*/ */
@ -245,7 +247,6 @@ public class RealtimePlumberSchoolTest
private void testPersist(final Object commitMetadata) throws Exception private void testPersist(final Object commitMetadata) throws Exception
{ {
final AtomicBoolean committed = new AtomicBoolean(false);
plumber.getSinks() plumber.getSinks()
.put( .put(
0L, 0L,
@ -262,6 +263,9 @@ public class RealtimePlumberSchoolTest
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>()); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row); EasyMock.replay(row);
final CountDownLatch doneSignal = new CountDownLatch(1);
final Committer committer = new Committer() final Committer committer = new Committer()
{ {
@Override @Override
@ -273,15 +277,14 @@ public class RealtimePlumberSchoolTest
@Override @Override
public void run() public void run()
{ {
committed.set(true); doneSignal.countDown();
} }
}; };
plumber.add(row, Suppliers.ofInstance(committer)); plumber.add(row, Suppliers.ofInstance(committer));
plumber.persist(committer); plumber.persist(committer);
while (!committed.get()) { doneSignal.await();
Thread.sleep(100);
}
plumber.getSinks().clear(); plumber.getSinks().clear();
plumber.finishJob(); plumber.finishJob();
} }
@ -289,7 +292,6 @@ public class RealtimePlumberSchoolTest
@Test(timeout = 60000) @Test(timeout = 60000)
public void testPersistFails() throws Exception public void testPersistFails() throws Exception
{ {
final AtomicBoolean committed = new AtomicBoolean(false);
plumber.getSinks() plumber.getSinks()
.put( .put(
0L, 0L,
@ -306,6 +308,9 @@ public class RealtimePlumberSchoolTest
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>()); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row); EasyMock.replay(row);
plumber.add(row, Committers.supplierOf(Committers.nil())); plumber.add(row, Committers.supplierOf(Committers.nil()));
final CountDownLatch doneSignal = new CountDownLatch(1);
plumber.persist( plumber.persist(
Committers.supplierFromRunnable( Committers.supplierFromRunnable(
new Runnable() new Runnable()
@ -313,15 +318,14 @@ public class RealtimePlumberSchoolTest
@Override @Override
public void run() public void run()
{ {
committed.set(true); doneSignal.countDown();
throw new RuntimeException(); throw new RuntimeException();
} }
} }
).get() ).get()
); );
while (!committed.get()) {
Thread.sleep(100); doneSignal.await();
}
// Exception may need time to propagate // Exception may need time to propagate
while (metrics.failedPersists() < 1) { while (metrics.failedPersists() < 1) {
@ -340,7 +344,6 @@ public class RealtimePlumberSchoolTest
private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Exception
{ {
final AtomicBoolean committed = new AtomicBoolean(false);
Interval testInterval = new Interval(new DateTime("1970-01-01"), new DateTime("1971-01-01")); Interval testInterval = new Interval(new DateTime("1970-01-01"), new DateTime("1971-01-01"));
RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
@ -355,7 +358,7 @@ public class RealtimePlumberSchoolTest
) )
); );
Assert.assertNull(plumber2.startJob()); Assert.assertNull(plumber2.startJob());
final CountDownLatch doneSignal = new CountDownLatch(1);
final Committer committer = new Committer() final Committer committer = new Committer()
{ {
@Override @Override
@ -367,7 +370,7 @@ public class RealtimePlumberSchoolTest
@Override @Override
public void run() public void run()
{ {
committed.set(true); doneSignal.countDown();
} }
}; };
plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer)); plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer));
@ -378,9 +381,7 @@ public class RealtimePlumberSchoolTest
plumber2.persist(committer); plumber2.persist(committer);
while (!committed.get()) { doneSignal.await();
Thread.sleep(100);
}
plumber2.getSinks().clear(); plumber2.getSinks().clear();
plumber2.finishJob(); plumber2.finishJob();
@ -438,6 +439,123 @@ public class RealtimePlumberSchoolTest
Assert.assertEquals(0, restoredPlumber2.getSinks().size()); Assert.assertEquals(0, restoredPlumber2.getSinks().size());
} }
@Test(timeout = 60000)
public void testDimOrderInheritance() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testDimOrderInheritanceHelper(commitMetadata);
}
private void testDimOrderInheritanceHelper(final Object commitMetadata) throws Exception
{
List<List<String>> expectedDims = ImmutableList.<List<String>>of(
ImmutableList.of("dimD"),
ImmutableList.of("dimC"),
ImmutableList.of("dimA"),
ImmutableList.of("dimB"),
ImmutableList.of("dimE"),
ImmutableList.of("dimD", "dimC", "dimA", "dimB", "dimE")
);
QueryableIndex qindex;
FireHydrant hydrant;
Map<Long, Sink> sinks;
RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics);
Assert.assertNull(plumber.startJob());
final CountDownLatch doneSignal = new CountDownLatch(1);
final Committer committer = new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
doneSignal.countDown();
}
};
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimD"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimC"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimA"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimB"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimE"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.add(
getTestInputRowFull(
"1970-01-01",
ImmutableList.of("dimA", "dimB", "dimC", "dimD", "dimE"),
ImmutableList.of("1")
),
Suppliers.ofInstance(committer)
);
plumber.persist(committer);
doneSignal.await();
plumber.getSinks().clear();
plumber.finishJob();
RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(
schema2,
tuningConfig,
metrics
);
restoredPlumber.bootstrapSinksFromDisk();
sinks = restoredPlumber.getSinks();
Assert.assertEquals(1, sinks.size());
List<FireHydrant> hydrants = Lists.newArrayList(sinks.get(0L));
for (int i = 0; i < hydrants.size(); i++) {
hydrant = hydrants.get(i);
qindex = hydrant.getSegment().asQueryableIndex();
Assert.assertEquals(i, hydrant.getCount());
Assert.assertEquals(expectedDims.get(i), ImmutableList.copyOf(qindex.getAvailableDimensions()));
}
}
private InputRow getTestInputRow(final String timeStr) private InputRow getTestInputRow(final String timeStr)
{ {
return new InputRow() return new InputRow()
@ -492,4 +610,58 @@ public class RealtimePlumberSchoolTest
}; };
} }
private InputRow getTestInputRowFull(final String timeStr, final List<String> dims, final List<String> dimVals)
{
return new InputRow()
{
@Override
public List<String> getDimensions()
{
return dims;
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime(timeStr).getMillis();
}
@Override
public DateTime getTimestamp()
{
return new DateTime(timeStr);
}
@Override
public List<String> getDimension(String dimension)
{
return dimVals;
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public long getLongMetric(String metric)
{
return 0L;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
@Override
public int compareTo(Row o)
{
return 0;
}
};
}
} }