Merge pull request #1306 from metamx/cleanupIndexCreation

Cleanup some code in index creation and add `reprocess` method, fixes #1309
This commit is contained in:
Xavier Léauté 2015-04-23 13:14:34 -07:00
commit edb82607fe
12 changed files with 1023 additions and 208 deletions

View File

@ -65,13 +65,13 @@ import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier; import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedMultivalue; import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.IndexedRTree; import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.VSizeIndexed; import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts; import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.serde.BitmapIndexColumnPartSupplier; import io.druid.segment.serde.BitmapIndexColumnPartSupplier;
import io.druid.segment.serde.ComplexColumnPartSerde; import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier; import io.druid.segment.serde.ComplexColumnPartSupplier;
@ -82,7 +82,6 @@ import io.druid.segment.serde.FloatGenericColumnSupplier;
import io.druid.segment.serde.LongGenericColumnPartSerde; import io.druid.segment.serde.LongGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnSupplier; import io.druid.segment.serde.LongGenericColumnSupplier;
import io.druid.segment.serde.SpatialIndexColumnPartSupplier; import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -94,7 +93,6 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.AbstractList; import java.util.AbstractList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
@ -203,7 +201,8 @@ public class IndexIO
return convertSegment(toConvert, converted, indexSpec, false); return convertSegment(toConvert, converted, indexSpec, false);
} }
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) throws IOException public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent)
throws IOException
{ {
final int version = SegmentUtils.getVersionFromDir(toConvert); final int version = SegmentUtils.getVersionFromDir(toConvert);
@ -230,13 +229,8 @@ public class IndexIO
DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec); DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
return true; return true;
default: default:
if(forceIfCurrent){ if (forceIfCurrent) {
final QueryableIndexIndexableAdapter indexIndexableAdapter = new QueryableIndexIndexableAdapter(loadIndex(toConvert)); IndexMaker.convert(toConvert, converted, indexSpec);
IndexMaker.append(
Collections.<IndexableAdapter>singletonList(indexIndexableAdapter),
converted,
indexSpec
);
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); DefaultIndexIOHandler.validateTwoSegments(toConvert, converted);
return true; return true;
} else { } else {
@ -251,6 +245,112 @@ public class IndexIO
public MMappedIndex mapDir(File inDir) throws IOException; public MMappedIndex mapDir(File inDir) throws IOException;
} }
public static void validateRowValues(
Rowboat rb1,
IndexableAdapter adapter1,
Rowboat rb2,
IndexableAdapter adapter2
)
{
final int[][] dims1 = rb1.getDims();
final int[][] dims2 = rb2.getDims();
if (dims1.length != dims2.length) {
throw new SegmentValidationException(
"Dim lengths not equal %s vs %s",
Arrays.deepToString(dims1),
Arrays.deepToString(dims2)
);
}
final Indexed<String> dim1Names = adapter1.getDimensionNames();
final Indexed<String> dim2Names = adapter2.getDimensionNames();
for (int i = 0; i < dims1.length; ++i) {
final int[] dim1Vals = dims1[i];
final int[] dim2Vals = dims2[i];
final String dim1Name = dim1Names.get(i);
final String dim2Name = dim2Names.get(i);
final Indexed<String> dim1ValNames = adapter1.getDimValueLookup(dim1Name);
final Indexed<String> dim2ValNames = adapter2.getDimValueLookup(dim2Name);
if (dim1Vals == null || dim2Vals == null) {
if (dim1Vals != dim2Vals) {
throw new SegmentValidationException(
"Expected nulls, found %s and %s",
Arrays.toString(dim1Vals),
Arrays.toString(dim2Vals)
);
} else {
continue;
}
}
if (dim1Vals.length != dim2Vals.length) {
// Might be OK if one of them has null. This occurs in IndexMakerTest
if (dim1Vals.length == 0 && dim2Vals.length == 1) {
final String dimValName = dim2ValNames.get(dim2Vals[0]);
if (dimValName == null) {
continue;
} else {
throw new SegmentValidationException(
"Dim [%s] value [%s] is not null",
dim2Name,
dimValName
);
}
} else if (dim2Vals.length == 0 && dim1Vals.length == 1) {
final String dimValName = dim1ValNames.get(dim1Vals[0]);
if (dimValName == null) {
continue;
} else {
throw new SegmentValidationException(
"Dim [%s] value [%s] is not null",
dim1Name,
dimValName
);
}
} else {
throw new SegmentValidationException(
"Dim [%s] value lengths not equal. Expected %d found %d",
dim1Name,
dims1.length,
dims2.length
);
}
}
for (int j = 0; j < Math.max(dim1Vals.length, dim2Vals.length); ++j) {
final int dIdex1 = dim1Vals.length <= j ? -1 : dim1Vals[j];
final int dIdex2 = dim2Vals.length <= j ? -1 : dim2Vals[j];
if (dIdex1 == dIdex2) {
continue;
}
final String dim1ValName = dIdex1 < 0 ? null : dim1ValNames.get(dIdex1);
final String dim2ValName = dIdex2 < 0 ? null : dim2ValNames.get(dIdex2);
if ((dim1ValName == null) || (dim2ValName == null)) {
if ((dim1ValName == null) && (dim2ValName == null)) {
continue;
} else {
throw new SegmentValidationException(
"Dim [%s] value not equal. Expected [%s] found [%s]",
dim1Name,
dim1ValName,
dim2ValName
);
}
}
if (!dim1ValName.equals(dim2ValName)) {
throw new SegmentValidationException(
"Dim [%s] value not equal. Expected [%s] found [%s]",
dim1Name,
dim1ValName,
dim2ValName
);
}
}
}
}
public static class DefaultIndexIOHandler implements IndexIOHandler public static class DefaultIndexIOHandler implements IndexIOHandler
{ {
private static final Logger log = new Logger(DefaultIndexIOHandler.class); private static final Logger log = new Logger(DefaultIndexIOHandler.class);
@ -278,10 +378,10 @@ public class IndexIO
indexBuffer.get(); // Skip the version byte indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> availableDimensions = GenericIndexed.read( final GenericIndexed<String> availableDimensions = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy indexBuffer, GenericIndexed.STRING_STRATEGY
); );
final GenericIndexed<String> availableMetrics = GenericIndexed.read( final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy indexBuffer, GenericIndexed.STRING_STRATEGY
); );
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory bitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory(); final BitmapSerdeFactory bitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
@ -315,7 +415,7 @@ public class IndexIO
fileDimensionName fileDimensionName
); );
dimValueLookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.stringStrategy)); dimValueLookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.STRING_STRATEGY));
dimColumns.put(dimension, VSizeIndexed.readFromByteBuffer(dimBuffer)); dimColumns.put(dimension, VSizeIndexed.readFromByteBuffer(dimBuffer));
} }
@ -359,30 +459,68 @@ public class IndexIO
public static void validateTwoSegments(File dir1, File dir2) throws IOException public static void validateTwoSegments(File dir1, File dir2) throws IOException
{ {
final QueryableIndexIndexableAdapter adapter1 = new QueryableIndexIndexableAdapter(loadIndex(dir1)); validateTwoSegments(
final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(loadIndex(dir2)); new QueryableIndexIndexableAdapter(loadIndex(dir1)),
if(adapter1.getNumRows() != adapter2.getNumRows()){ new QueryableIndexIndexableAdapter(loadIndex(dir2))
throw new IOException("Validation failure - Row count mismatch"); );
}
public static void validateTwoSegments(final IndexableAdapter adapter1, final IndexableAdapter adapter2)
{
if (adapter1.getNumRows() != adapter2.getNumRows()) {
throw new SegmentValidationException(
"Row count mismatch. Expected [%d] found [%d]",
adapter1.getNumRows(),
adapter2.getNumRows()
);
}
{
final Set<String> dimNames1 = Sets.newHashSet(adapter1.getDimensionNames());
final Set<String> dimNames2 = Sets.newHashSet(adapter2.getDimensionNames());
if (!dimNames1.equals(dimNames2)) {
throw new SegmentValidationException(
"Dimension names differ. Expected [%s] found [%s]",
dimNames1,
dimNames2
);
}
final Set<String> metNames1 = Sets.newHashSet(adapter1.getMetricNames());
final Set<String> metNames2 = Sets.newHashSet(adapter2.getMetricNames());
if (!metNames1.equals(metNames2)) {
throw new SegmentValidationException("Metric names differ. Expected [%s] found [%s]", metNames1, metNames2);
}
} }
final Iterator<Rowboat> it1 = adapter1.getRows().iterator(); final Iterator<Rowboat> it1 = adapter1.getRows().iterator();
final Iterator<Rowboat> it2 = adapter2.getRows().iterator(); final Iterator<Rowboat> it2 = adapter2.getRows().iterator();
long row = 0L; long row = 0L;
while(it1.hasNext()){ while (it1.hasNext()) {
if(it1.hasNext() ^ it2.hasNext()){ if (!it2.hasNext()) {
throw new IOException("Validation failure - Iterator doesn't have enough"); throw new SegmentValidationException("Unexpected end of second adapter");
} }
final Rowboat rb1 = it1.next(); final Rowboat rb1 = it1.next();
final Rowboat rb2 = it2.next(); final Rowboat rb2 = it2.next();
++row; ++row;
if(rb1.compareTo(rb2) != 0){ if (rb1.getRowNum() != rb2.getRowNum()) {
throw new IOException(String.format("Validation failure on row %d: [%s] vs [%s]", row, rb1, rb2)); throw new SegmentValidationException("Row number mismatch: [%d] vs [%d]", rb1.getRowNum(), rb2.getRowNum());
}
if (rb1.compareTo(rb2) != 0) {
try {
validateRowValues(rb1, adapter1, rb2, adapter2);
}
catch (SegmentValidationException ex) {
throw new SegmentValidationException(ex, "Validation failure on row %d: [%s] vs [%s]", row, rb1, rb2);
}
} }
} }
if(it2.hasNext()){ if (it2.hasNext()) {
throw new IOException("Validation failure - Iterator still has more"); throw new SegmentValidationException("Unexpected end of first adapter");
} }
if(row != adapter1.getNumRows()){ if (row != adapter1.getNumRows()) {
throw new IOException("Validation failure - Actual Row count mismatch"); throw new SegmentValidationException(
"Actual Row count mismatch. Expected [%d] found [%d]",
row,
adapter1.getNumRows()
);
} }
} }
@ -455,7 +593,7 @@ public class IndexIO
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray())); outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
GenericIndexed<String> dictionary = GenericIndexed.read( GenericIndexed<String> dictionary = GenericIndexed.read(
dimBuffer, GenericIndexed.stringStrategy dimBuffer, GenericIndexed.STRING_STRATEGY
); );
if (dictionary.size() == 0) { if (dictionary.size() == 0) {
@ -503,7 +641,7 @@ public class IndexIO
dictionary = GenericIndexed.fromIterable( dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dictionary), Iterables.concat(nullList, dictionary),
GenericIndexed.stringStrategy GenericIndexed.STRING_STRATEGY
); );
bitmaps = GenericIndexed.fromIterable( bitmaps = GenericIndexed.fromIterable(
@ -574,8 +712,11 @@ public class IndexIO
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
} }
} else { } else {
if(compressionStrategy != null) { if (compressionStrategy != null) {
log.info("Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]", dimension); log.info(
"Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
dimension
);
} }
columnPartBuilder.withMultiValuedColumn(multiValCol); columnPartBuilder.withMultiValuedColumn(multiValCol);
} }
@ -670,7 +811,7 @@ public class IndexIO
indexBuffer.get(); // Skip the version byte indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims8 = GenericIndexed.read( final GenericIndexed<String> dims8 = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy indexBuffer, GenericIndexed.STRING_STRATEGY
); );
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable( final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
Iterables.filter( Iterables.filter(
@ -683,10 +824,10 @@ public class IndexIO
} }
} }
), ),
GenericIndexed.stringStrategy GenericIndexed.STRING_STRATEGY
); );
final GenericIndexed<String> availableMetrics = GenericIndexed.read( final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.stringStrategy indexBuffer, GenericIndexed.STRING_STRATEGY
); );
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue(
@ -698,7 +839,7 @@ public class IndexIO
columns.addAll(Lists.newArrayList(dims9)); columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics)); columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy); GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
@ -840,8 +981,8 @@ public class IndexIO
* Index.drd should consist of the segment version, the columns and dimensions of the segment as generic * Index.drd should consist of the segment version, the columns and dimensions of the segment as generic
* indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type. * indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type.
*/ */
final GenericIndexed<String> cols = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy); final GenericIndexed<String> cols = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy); final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong()); final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory; final BitmapSerdeFactory segmentBitmapSerdeFactory;
/** /**

View File

@ -92,6 +92,7 @@ import java.nio.LongBuffer;
import java.util.AbstractList; import java.util.AbstractList;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -321,6 +322,37 @@ public class IndexMaker
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec); return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
} }
public static File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException
{
return convert(inDir, outDir, indexSpec, new BaseProgressIndicator());
}
public static File convert(
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
) throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir));
return makeIndexFiles(
ImmutableList.of(adapter),
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@Override
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
{
return input.get(0);
}
},
indexSpec
);
}
public static File append( public static File append(
final List<IndexableAdapter> adapters, final List<IndexableAdapter> adapters,
final File outDir, final File outDir,
@ -504,7 +536,16 @@ public class IndexMaker
makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount, indexSpec); makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount, indexSpec);
progress.progress(); progress.progress();
makeIndexBinary(v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec); makeIndexBinary(
v9Smoosher,
adapters,
outDir,
mergedDimensions,
mergedMetrics,
skippedDimensions,
progress,
indexSpec
);
v9Smoosher.close(); v9Smoosher.close();
@ -796,6 +837,33 @@ public class IndexMaker
progress.stopSection(dimSection); progress.stopSection(dimSection);
} }
private static class NullsAtZeroConvertingIntList extends AbstractList<Integer>
{
private final List<Integer> delegate;
private final boolean delegateHasNullAtZero;
NullsAtZeroConvertingIntList(List<Integer> delegate, final boolean delegateHasNullAtZero)
{
this.delegate = delegate;
this.delegateHasNullAtZero = delegateHasNullAtZero;
}
@Override
public Integer get(int index)
{
Integer val = delegate.get(index);
if (val == null) {
return 0;
}
return delegateHasNullAtZero ? val : val + 1;
}
@Override
public int size()
{
return delegate.size();
}
}
private static void makeDimColumn( private static void makeDimColumn(
final FileSmoosher v9Smoosher, final FileSmoosher v9Smoosher,
final List<IndexableAdapter> adapters, final List<IndexableAdapter> adapters,
@ -860,7 +928,7 @@ public class IndexMaker
final Iterable<String> dimensionValues = dimensionValuesLookup.get(dimension); final Iterable<String> dimensionValues = dimensionValuesLookup.get(dimension);
GenericIndexed<String> dictionary = GenericIndexed.fromIterable( GenericIndexed<String> dictionary = GenericIndexed.fromIterable(
dimensionValues, dimensionValues,
GenericIndexed.stringStrategy GenericIndexed.STRING_STRATEGY
); );
boolean bumpDictionary = false; boolean bumpDictionary = false;
@ -873,127 +941,67 @@ public class IndexMaker
bumpDictionary = true; bumpDictionary = true;
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
final List<String> nullList = Lists.newArrayList();
nullList.add(null);
dictionary = GenericIndexed.fromIterable( dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dimensionValues), Iterables.concat(Collections.<String>singleton(null), dimensionValues),
GenericIndexed.stringStrategy GenericIndexed.STRING_STRATEGY
); );
final int dictionarySize = dictionary.size(); final int dictionarySize = dictionary.size();
singleValCol = null; singleValCol = null;
multiValCol = VSizeIndexed.fromIterable( multiValCol = VSizeIndexed.fromIterable(
FunctionalIterable Iterables.transform(
.create(vals) vals,
.transform( new Function<List<Integer>, VSizeIndexedInts>()
new Function<List<Integer>, VSizeIndexedInts>() {
{ @Override
@Override public VSizeIndexedInts apply(final List<Integer> input)
public VSizeIndexedInts apply(final List<Integer> input) {
{ if (input == null) {
if (input == null) { return VSizeIndexedInts.fromList(ImmutableList.<Integer>of(0), dictionarySize);
return VSizeIndexedInts.fromList( } else {
new AbstractList<Integer>() return VSizeIndexedInts.fromList(new NullsAtZeroConvertingIntList(input, false), dictionarySize);
{
@Override
public Integer get(int index)
{
return 0;
}
@Override
public int size()
{
return 1;
}
}, dictionarySize
);
}
return VSizeIndexedInts.fromList(
new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
Integer val = input.get(index);
if (val == null) {
return 0;
}
return val + 1;
}
@Override
public int size()
{
return input.size();
}
},
dictionarySize
);
}
} }
) }
}
)
); );
} else { } else {
final int dictionarySize = dictionary.size(); final int dictionarySize = dictionary.size();
singleValCol = null; singleValCol = null;
multiValCol = VSizeIndexed.fromIterable( multiValCol = VSizeIndexed.fromIterable(
FunctionalIterable Iterables.transform(
.create(vals) vals,
.transform( new Function<List<Integer>, VSizeIndexedInts>()
new Function<List<Integer>, VSizeIndexedInts>() {
{ @Override
@Override public VSizeIndexedInts apply(List<Integer> input)
public VSizeIndexedInts apply(List<Integer> input) {
{ if (input == null) {
if (input == null) { return VSizeIndexedInts.fromList(ImmutableList.<Integer>of(0), dictionarySize);
return VSizeIndexedInts.fromList( } else {
new AbstractList<Integer>() return VSizeIndexedInts.fromList(input, dictionarySize);
{
@Override
public Integer get(int index)
{
return 0;
}
@Override
public int size()
{
return 1;
}
}, dictionarySize
);
}
return VSizeIndexedInts.fromList(
input,
dictionarySize
);
}
} }
) }
}
)
); );
} }
} else { } else {
final int dictionarySize = dictionary.size(); final int dictionarySize = dictionary.size();
singleValCol = null; singleValCol = null;
multiValCol = VSizeIndexed.fromIterable( multiValCol = VSizeIndexed.fromIterable(
FunctionalIterable Iterables.transform(
.create(vals) vals,
.transform( new Function<List<Integer>, VSizeIndexedInts>()
new Function<List<Integer>, VSizeIndexedInts>() {
{ @Override
@Override public VSizeIndexedInts apply(List<Integer> input)
public VSizeIndexedInts apply(List<Integer> input) {
{ return VSizeIndexedInts.fromList(input, dictionarySize);
return VSizeIndexedInts.fromList( }
input, }
dictionarySize )
);
}
}
)
); );
} }
} else { } else {
@ -1011,47 +1019,13 @@ public class IndexMaker
dictionary = GenericIndexed.fromIterable( dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dimensionValues), Iterables.concat(nullList, dimensionValues),
GenericIndexed.stringStrategy GenericIndexed.STRING_STRATEGY
); );
multiValCol = null; multiValCol = null;
singleValCol = new AbstractList<Integer>() singleValCol = new NullsAtZeroConvertingIntList(vals, false);
{
@Override
public Integer get(int index)
{
Integer val = vals.get(index);
if (val == null) {
return 0;
}
return val + 1;
}
@Override
public int size()
{
return vals.size();
}
};
} else { } else {
multiValCol = null; multiValCol = null;
singleValCol = new AbstractList<Integer>() singleValCol = new NullsAtZeroConvertingIntList(vals, true);
{
@Override
public Integer get(int index)
{
Integer val = vals.get(index);
if (val == null) {
return 0;
}
return val;
}
@Override
public int size()
{
return vals.size();
}
};
} }
} else { } else {
multiValCol = null; multiValCol = null;
@ -1214,8 +1188,11 @@ public class IndexMaker
dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
} }
} else { } else {
if(compressionStrategy != null) { if (compressionStrategy != null) {
log.info("Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]", dimension); log.info(
"Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
dimension
);
} }
dimPartBuilder.withMultiValuedColumn(multiValCol); dimPartBuilder.withMultiValuedColumn(multiValCol);
} }
@ -1399,8 +1376,8 @@ public class IndexMaker
} }
); );
GenericIndexed<String> cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.stringStrategy); GenericIndexed<String> cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.STRING_STRATEGY);
GenericIndexed<String> dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.stringStrategy); GenericIndexed<String> dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.STRING_STRATEGY);
final String bitmapSerdeFactoryType = mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()); final String bitmapSerdeFactoryType = mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory());
final long numBytes = cols.getSerializedSize() final long numBytes = cols.getSerializedSize()
@ -1634,7 +1611,7 @@ public class IndexMaker
{ {
int[][] dims = input.getDims(); int[][] dims = input.getDims();
int[][] newDims = new int[convertedDims.size()][]; int[][] newDims = new int[convertedDims.size()][];
for (int i = 0; i < convertedDims.size(); ++i) { for (int i = 0; i < newDims.length; ++i) {
IntBuffer converter = converters.get(convertedDims.get(i)); IntBuffer converter = converters.get(convertedDims.get(i));
if (converter == null) { if (converter == null) {

View File

@ -144,13 +144,22 @@ public class IndexMerger
* *
* @throws java.io.IOException if an IO error occurs persisting the index * @throws java.io.IOException if an IO error occurs persisting the index
*/ */
public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir, IndexSpec indexSpec) throws IOException public static File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
IndexSpec indexSpec
) throws IOException
{ {
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator()); return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
} }
public static File persist( public static File persist(
final IncrementalIndex index, final Interval dataInterval, File outDir, IndexSpec indexSpec, ProgressIndicator progress final IncrementalIndex index,
final Interval dataInterval,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException ) throws IOException
{ {
if (index.isEmpty()) { if (index.isEmpty()) {
@ -199,7 +208,11 @@ public class IndexMerger
} }
public static File mergeQueryableIndex( public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress List<QueryableIndex> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException ) throws IOException
{ {
return merge( return merge(
@ -229,7 +242,11 @@ public class IndexMerger
} }
public static File merge( public static File merge(
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException ) throws IOException
{ {
FileUtils.deleteDirectory(outDir); FileUtils.deleteDirectory(outDir);
@ -453,8 +470,8 @@ public class IndexMerger
channel = fileOutputStream.getChannel(); channel = fileOutputStream.getChannel();
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION})); channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy).writeToChannel(channel); GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy).writeToChannel(channel); GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT); DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT);
DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT); DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT);
@ -492,7 +509,7 @@ public class IndexMerger
for (String dimension : mergedDimensions) { for (String dimension : mergedDimensions) {
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>( final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
ioPeon, dimension, GenericIndexed.stringStrategy ioPeon, dimension, GenericIndexed.STRING_STRATEGY
); );
writer.open(); writer.open();
@ -762,7 +779,7 @@ public class IndexMerger
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
} }
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy); Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
@ -875,8 +892,8 @@ public class IndexMerger
createIndexDrdFile( createIndexDrdFile(
IndexIO.V8_VERSION, IndexIO.V8_VERSION,
v8OutDir, v8OutDir,
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy), GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY),
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy), GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY),
dataInterval, dataInterval,
indexSpec.getBitmapSerdeFactory() indexSpec.getBitmapSerdeFactory()
); );

View File

@ -17,13 +17,19 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeSet; import java.util.TreeSet;
@ -129,8 +135,8 @@ public class Rowboat implements Comparable<Rowboat>
{ {
return "Rowboat{" + return "Rowboat{" +
"timestamp=" + new DateTime(timestamp).toString() + "timestamp=" + new DateTime(timestamp).toString() +
", dims=" + (dims == null ? null : Arrays.asList(dims)) + ", dims=" + Arrays.deepToString(dims) +
", metrics=" + (metrics == null ? null : Arrays.asList(metrics)) + ", metrics=" + Arrays.toString(metrics) +
", comprisedRows=" + comprisedRows + ", comprisedRows=" + comprisedRows +
'}'; '}';
} }

View File

@ -0,0 +1,36 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
/**
*
*/
public class SegmentValidationException extends RuntimeException
{
public SegmentValidationException(String formatText, Object... arguments)
{
super(String.format(formatText, arguments));
}
public SegmentValidationException(Throwable cause, String formatText, Object... arguments)
{
super(String.format(formatText, arguments), cause);
}
}

View File

@ -319,7 +319,7 @@ public class GenericIndexed<T> implements Indexed<T>
throw new IAE("Unknown version[%s]", versionFromBuffer); throw new IAE("Unknown version[%s]", versionFromBuffer);
} }
public static ObjectStrategy<String> stringStrategy = new CacheableObjectStrategy<String>() public static final ObjectStrategy<String> STRING_STRATEGY = new CacheableObjectStrategy<String>()
{ {
@Override @Override
public Class<? extends String> getClazz() public Class<? extends String> getClazz()

View File

@ -45,11 +45,11 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{ {
private static final Logger log = new Logger(IncrementalIndexAdapter.class); private static final Logger log = new Logger(IncrementalIndexAdapter.class);
private final Interval dataInterval; private final Interval dataInterval;
private final IncrementalIndex<Object> index; private final IncrementalIndex<?> index;
private final Map<String, Map<String, MutableBitmap>> invertedIndexes; private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
public IncrementalIndexAdapter( public IncrementalIndexAdapter(
Interval dataInterval, IncrementalIndex<Object> index, BitmapFactory bitmapFactory Interval dataInterval, IncrementalIndex<?> index, BitmapFactory bitmapFactory
) )
{ {
this.dataInterval = dataInterval; this.dataInterval = dataInterval;

View File

@ -317,7 +317,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns"); throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns");
} }
final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.stringStrategy); final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.STRING_STRATEGY);
builder.setType(ValueType.STRING); builder.setType(ValueType.STRING);
final WritableSupplier<IndexedInts> rSingleValuedColumn; final WritableSupplier<IndexedInts> rSingleValuedColumn;

View File

@ -17,16 +17,291 @@
package io.druid.segment; package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.UOE;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* This is mostly a test of the validator
*/
@RunWith(Parameterized.class)
public class IndexIOTest public class IndexIOTest
{ {
@Test @Ignore // this test depends on static fields, so it has to be tested independently private static Interval DEFAULT_INTERVAL = Interval.parse("1970-01-01/2000-01-01");
public void testInjector() throws Exception private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4
);
private static <T> List<T> filterByBitset(List<T> list, BitSet bitSet)
{ {
System.setProperty("druid.processing.columnCache.sizeBytes", "1234"); final ArrayList<T> outList = new ArrayList<>(bitSet.cardinality());
Assert.assertEquals(1234, IndexIO.columnConfig.columnCacheSizeBytes()); for (int i = 0; i < list.size(); ++i) {
if (bitSet.get(i)) {
outList.add(list.get(i));
}
}
return outList;
}
@Parameterized.Parameters
public static Iterable<Object[]> constructionFeeder()
{
final Map<String, Object> map = ImmutableMap.<String, Object>of();
final Map<String, Object> map00 = ImmutableMap.<String, Object>of(
"dim0", ImmutableList.<String>of("dim00", "dim01")
);
final Map<String, Object> map10 = ImmutableMap.<String, Object>of(
"dim1", "dim10"
);
final Map<String, Object> map0null = new HashMap<>();
map0null.put("dim0", null);
final Map<String, Object> map1null = new HashMap<>();
map1null.put("dim1", null);
final Map<String, Object> mapAll = ImmutableMap.<String, Object>of(
"dim0", ImmutableList.<String>of("dim00", "dim01"),
"dim1", "dim10"
);
final List<Map<String, Object>> maps = ImmutableList.of(
map, map00, map10, map0null, map1null, mapAll
);
return Iterables.<Object[]>concat(
// First iterable tests permutations of the maps which are expected to be equal
Iterables.<Object[]>concat(
new Iterable<Iterable<Object[]>>()
{
@Override
public Iterator<Iterable<Object[]>> iterator()
{
return new Iterator<Iterable<Object[]>>()
{
long nextBitset = 1L;
@Override
public boolean hasNext()
{
return nextBitset < (1L << maps.size());
}
@Override
public Iterable<Object[]> next()
{
final BitSet bitset = BitSet.valueOf(new long[]{nextBitset++});
final List<Map<String, Object>> myMaps = filterByBitset(maps, bitset);
return Collections2.transform(
Collections2.permutations(myMaps), new Function<List<Map<String, Object>>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<Map<String, Object>> input)
{
return new Object[]{input, input, null};
}
}
);
}
@Override
public void remove()
{
throw new UOE("Remove not suported");
}
};
}
}
),
// Second iterable tests combinations of the maps which may or may not be equal
Iterables.<Object[]>concat(
new Iterable<Iterable<Object[]>>()
{
@Override
public Iterator<Iterable<Object[]>> iterator()
{
return new Iterator<Iterable<Object[]>>()
{
long nextMap1Bits = 1L;
@Override
public boolean hasNext()
{
return nextMap1Bits < (1L << maps.size());
}
@Override
public Iterable<Object[]> next()
{
final BitSet bitset1 = BitSet.valueOf(new long[]{nextMap1Bits++});
final List<Map<String, Object>> maplist1 = filterByBitset(maps, bitset1);
return new Iterable<Object[]>()
{
@Override
public Iterator<Object[]> iterator()
{
return new Iterator<Object[]>()
{
long nextMap2Bits = 1L;
@Override
public boolean hasNext()
{
return nextMap2Bits < (1L << maps.size());
}
@Override
public Object[] next()
{
final BitSet bitset2 = BitSet.valueOf(new long[]{nextMap2Bits++});
return new Object[]{
maplist1,
filterByBitset(maps, bitset2),
nextMap2Bits == nextMap1Bits ? null : SegmentValidationException.class
};
}
@Override
public void remove()
{
throw new UOE("remove not supported");
}
};
}
};
}
@Override
public void remove()
{
throw new UOE("Remove not supported");
}
};
}
}
)
);
}
private final Collection<Map<String, Object>> events1;
private final Collection<Map<String, Object>> events2;
private final Class<? extends Exception> exception;
public IndexIOTest(
Collection<Map<String, Object>> events1,
Collection<Map<String, Object>> events2,
Class<? extends Exception> exception
)
{
this.events1 = events1;
this.events2 = events2;
this.exception = exception;
}
final IncrementalIndex<Aggregator> incrementalIndex1 = new OnheapIncrementalIndex(
DEFAULT_INTERVAL.getStart().getMillis(),
QueryGranularity.NONE,
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
},
1000000
);
final IncrementalIndex<Aggregator> incrementalIndex2 = new OnheapIncrementalIndex(
DEFAULT_INTERVAL.getStart().getMillis(),
QueryGranularity.NONE,
new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
},
1000000
);
IndexableAdapter adapter1;
IndexableAdapter adapter2;
@Before
public void setUp() throws IndexSizeExceededException
{
long timestamp = 0L;
for (Map<String, Object> event : events1) {
incrementalIndex1.add(new MapBasedInputRow(timestamp++, Lists.newArrayList(event.keySet()), event));
}
timestamp = 0L;
for (Map<String, Object> event : events2) {
incrementalIndex2.add(new MapBasedInputRow(timestamp++, Lists.newArrayList(event.keySet()), event));
}
adapter2 = new IncrementalIndexAdapter(
DEFAULT_INTERVAL,
incrementalIndex2,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
);
adapter1 = new IncrementalIndexAdapter(
DEFAULT_INTERVAL,
incrementalIndex1,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
);
}
@Test
public void testRowValidatorEquals() throws Exception
{
Exception ex = null;
try {
IndexIO.DefaultIndexIOHandler.validateTwoSegments(adapter1, adapter2);
}
catch (Exception e) {
ex = e;
}
if (exception != null) {
Assert.assertNotNull("Exception was not thrown", ex);
if (!exception.isAssignableFrom(ex.getClass())) {
throw ex;
}
} else {
if (ex != null) {
throw ex;
}
}
} }
} }

View File

@ -0,0 +1,253 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class IndexMakerTest
{
private static final long TIMESTAMP = DateTime.parse("2014-01-01").getMillis();
private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{
new CountAggregatorFactory(
"count"
)
};
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4
);
private static final List<String> DIMS = ImmutableList.of("dim0", "dim1");
private static final Function<Collection<Map<String, Object>>, Object[]> OBJECT_MAKER = new Function<Collection<Map<String, Object>>, Object[]>()
{
@Nullable
@Override
public Object[] apply(Collection<Map<String, Object>> input)
{
final ArrayList<InputRow> list = new ArrayList<>();
int i = 0;
for (final Map<String, Object> map : input) {
list.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map));
}
return new Object[]{list};
}
};
@SafeVarargs
public static Collection<Object[]> permute(Map<String, Object>... maps)
{
if (maps == null) {
return ImmutableList.<Object[]>of();
}
return Collections2.transform(
Collections2.permutations(
Arrays.asList(maps)
),
OBJECT_MAKER
);
}
@Parameterized.Parameters
public static Iterable<Object[]> paramFeeder()
{
final Map<String, Object> map1 = ImmutableMap.<String, Object>of(
DIMS.get(0), ImmutableList.<String>of("dim00", "dim01"),
DIMS.get(1), "dim10"
);
final List<String> nullList = Collections.singletonList(null);
final Map<String, Object> map2 = ImmutableMap.<String, Object>of(
DIMS.get(0), nullList,
DIMS.get(1), "dim10"
);
final Map<String, Object> map3 = ImmutableMap.<String, Object>of(
DIMS.get(0),
ImmutableList.<String>of("dim00", "dim01")
);
final Map<String, Object> map4 = ImmutableMap.<String, Object>of();
final Map<String, Object> map5 = ImmutableMap.<String, Object>of(DIMS.get(1), "dim10");
final Map<String, Object> map6 = new HashMap<>();
map6.put(DIMS.get(1), null); // ImmutableMap cannot take null
return Iterables.<Object[]>concat(
permute(map1)
, permute(map1, map4)
, permute(map1, map5)
, permute(map5, map6)
, permute(map4, map5)
, Iterables.transform(ImmutableList.of(Arrays.asList(map1, map2, map3, map4, map5, map6)), OBJECT_MAKER)
);
}
private final Collection<InputRow> events;
public IndexMakerTest(
final Collection<InputRow> events
)
{
this.events = events;
}
IncrementalIndex toPersist;
File tmpDir;
File persistTmpDir;
@Before
public void setUp() throws IOException
{
toPersist = new OnheapIncrementalIndex(
JodaUtils.MIN_INSTANT,
QueryGranularity.NONE,
DEFAULT_AGG_FACTORIES,
1000000
);
for (InputRow event : events) {
toPersist.add(event);
}
tmpDir = Files.createTempDir();
persistTmpDir = new File(tmpDir, "persistDir");
IndexMerger.persist(toPersist, persistTmpDir, INDEX_SPEC);
}
@After
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void testSimpleReprocess() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
Assert.assertEquals(events.size(), adapter.getNumRows());
reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
private File reprocessAndValidate(File inDir, File tmpDir) throws IOException
{
final File outDir = IndexMaker.convert(
inDir,
tmpDir,
INDEX_SPEC
);
IndexIO.DefaultIndexIOHandler.validateTwoSegments(persistTmpDir, outDir);
return outDir;
}
private File appendAndValidate(File inDir, File tmpDir) throws IOException
{
final File outDir = IndexMerger.append(
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir))),
tmpDir,
INDEX_SPEC
);
IndexIO.DefaultIndexIOHandler.validateTwoSegments(persistTmpDir, outDir);
return outDir;
}
@Test
public void testIdempotentReprocess() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
reprocessAndValidate(persistTmpDir, tmpDir1);
final File tmpDir2 = new File(tmpDir, "reprocessed2");
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1));
Assert.assertEquals(events.size(), adapter2.getNumRows());
reprocessAndValidate(tmpDir1, tmpDir2);
final File tmpDir3 = new File(tmpDir, "reprocessed3");
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2));
Assert.assertEquals(events.size(), adapter3.getNumRows());
reprocessAndValidate(tmpDir2, tmpDir3);
}
@Test
public void testSimpleAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
Assert.assertEquals(events.size(), adapter.getNumRows());
appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
@Test
public void testIdempotentAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
appendAndValidate(persistTmpDir, tmpDir1);
final File tmpDir2 = new File(tmpDir, "reprocessed2");
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1));
Assert.assertEquals(events.size(), adapter2.getNumRows());
appendAndValidate(tmpDir1, tmpDir2);
final File tmpDir3 = new File(tmpDir, "reprocessed3");
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2));
Assert.assertEquals(events.size(), adapter3.getNumRows());
appendAndValidate(tmpDir2, tmpDir3);
}
}

View File

@ -36,7 +36,7 @@ public class GenericIndexedTest
@Test(expected = UnsupportedOperationException.class) @Test(expected = UnsupportedOperationException.class)
public void testNotSortedNoIndexOf() throws Exception public void testNotSortedNoIndexOf() throws Exception
{ {
GenericIndexed.fromArray(new String[]{"a", "c", "b"}, GenericIndexed.stringStrategy).indexOf("a"); GenericIndexed.fromArray(new String[]{"a", "c", "b"}, GenericIndexed.STRING_STRATEGY).indexOf("a");
} }
@Test(expected = UnsupportedOperationException.class) @Test(expected = UnsupportedOperationException.class)
@ -44,7 +44,7 @@ public class GenericIndexedTest
{ {
serializeAndDeserialize( serializeAndDeserialize(
GenericIndexed.fromArray( GenericIndexed.fromArray(
new String[]{"a", "c", "b"}, GenericIndexed.stringStrategy new String[]{"a", "c", "b"}, GenericIndexed.STRING_STRATEGY
) )
).indexOf("a"); ).indexOf("a");
} }
@ -53,7 +53,7 @@ public class GenericIndexedTest
public void testSanity() throws Exception public void testSanity() throws Exception
{ {
final String[] strings = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"}; final String[] strings = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"};
Indexed<String> indexed = GenericIndexed.fromArray(strings, GenericIndexed.stringStrategy); Indexed<String> indexed = GenericIndexed.fromArray(strings, GenericIndexed.STRING_STRATEGY);
Assert.assertEquals(strings.length, indexed.size()); Assert.assertEquals(strings.length, indexed.size());
for (int i = 0; i < strings.length; i++) { for (int i = 0; i < strings.length; i++) {
@ -81,7 +81,7 @@ public class GenericIndexedTest
GenericIndexed<String> deserialized = serializeAndDeserialize( GenericIndexed<String> deserialized = serializeAndDeserialize(
GenericIndexed.fromArray( GenericIndexed.fromArray(
strings, GenericIndexed.stringStrategy strings, GenericIndexed.STRING_STRATEGY
) )
); );
@ -114,7 +114,7 @@ public class GenericIndexedTest
final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray()); final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
Assert.assertEquals(indexed.getSerializedSize(), byteBuffer.remaining()); Assert.assertEquals(indexed.getSerializedSize(), byteBuffer.remaining());
GenericIndexed<String> deserialized = GenericIndexed.read( GenericIndexed<String> deserialized = GenericIndexed.read(
byteBuffer, GenericIndexed.stringStrategy byteBuffer, GenericIndexed.STRING_STRATEGY
); );
Assert.assertEquals(0, byteBuffer.remaining()); Assert.assertEquals(0, byteBuffer.remaining());
return deserialized; return deserialized;

View File

@ -0,0 +1,110 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.filter;
import io.druid.segment.Rowboat;
import org.junit.Assert;
import org.junit.Test;
/**
*
*/
public class RowboatTest
{
@Test
public void testRowboatCompare()
{
Rowboat rb1 = new Rowboat(12345L, new int[][]{new int[]{1}, new int[]{2}}, new Object[]{new Integer(7)}, 5);
Rowboat rb2 = new Rowboat(12345L, new int[][]{new int[]{1}, new int[]{2}}, new Object[]{new Integer(7)}, 5);
Assert.assertEquals(0, rb1.compareTo(rb2));
Rowboat rb3 = new Rowboat(12345L, new int[][]{new int[]{3}, new int[]{2}}, new Object[]{new Integer(7)}, 5);
Assert.assertNotEquals(0, rb1.compareTo(rb3));
}
@Test
public void testBiggerCompare()
{
Rowboat rb1 = new Rowboat(
0,
new int[][]{
new int[]{0},
new int[]{138},
new int[]{44},
new int[]{374},
new int[]{0},
new int[]{0},
new int[]{552},
new int[]{338},
new int[]{910},
new int[]{25570},
new int[]{9},
new int[]{0},
new int[]{0},
new int[]{0}
},
new Object[]{1.0, 47.0, "someMetric"},
0
);
Rowboat rb2 = new Rowboat(
0,
new int[][]{
new int[]{0},
new int[]{138},
new int[]{44},
new int[]{374},
new int[]{0},
new int[]{0},
new int[]{553},
new int[]{338},
new int[]{910},
new int[]{25580},
new int[]{9},
new int[]{0},
new int[]{0},
new int[]{0}
},
new Object[]{1.0, 47.0, "someMetric"},
0
);
Assert.assertNotEquals(0, rb1.compareTo(rb2));
}
@Test
public void testToString()
{
Assert.assertEquals(
"Rowboat{timestamp=1970-01-01T00:00:00.000Z, dims=[[1], [2]], metrics=[someMetric], comprisedRows={}}",
new Rowboat(0, new int[][]{new int[]{1}, new int[]{2}}, new Object[]{"someMetric"}, 5).toString()
);
}
@Test
public void testLotsONullString()
{
Assert.assertEquals(
"Rowboat{timestamp=1970-01-01T00:00:00.000Z, dims=null, metrics=null, comprisedRows={}}",
new Rowboat(0, null, null, 5).toString()
);
}
}