mirror of https://github.com/apache/druid.git
Cleanup some code in index creation.
* Add some unit tests * Add io.druid.segment.IndexMerger.reprocess for quick re-indexing of data * Add dim-value validation to validation checker (instead of ONLY index #) * General code refactoring to make things a little easier to read
This commit is contained in:
parent
f5943ed494
commit
f2300430d1
|
@ -65,13 +65,13 @@ import io.druid.segment.data.CompressedLongsIndexedSupplier;
|
||||||
import io.druid.segment.data.CompressedObjectStrategy;
|
import io.druid.segment.data.CompressedObjectStrategy;
|
||||||
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
|
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
|
||||||
import io.druid.segment.data.GenericIndexed;
|
import io.druid.segment.data.GenericIndexed;
|
||||||
|
import io.druid.segment.data.Indexed;
|
||||||
import io.druid.segment.data.IndexedInts;
|
import io.druid.segment.data.IndexedInts;
|
||||||
import io.druid.segment.data.IndexedIterable;
|
import io.druid.segment.data.IndexedIterable;
|
||||||
import io.druid.segment.data.IndexedMultivalue;
|
import io.druid.segment.data.IndexedMultivalue;
|
||||||
import io.druid.segment.data.IndexedRTree;
|
import io.druid.segment.data.IndexedRTree;
|
||||||
import io.druid.segment.data.VSizeIndexed;
|
import io.druid.segment.data.VSizeIndexed;
|
||||||
import io.druid.segment.data.VSizeIndexedInts;
|
import io.druid.segment.data.VSizeIndexedInts;
|
||||||
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
|
||||||
import io.druid.segment.serde.BitmapIndexColumnPartSupplier;
|
import io.druid.segment.serde.BitmapIndexColumnPartSupplier;
|
||||||
import io.druid.segment.serde.ComplexColumnPartSerde;
|
import io.druid.segment.serde.ComplexColumnPartSerde;
|
||||||
import io.druid.segment.serde.ComplexColumnPartSupplier;
|
import io.druid.segment.serde.ComplexColumnPartSupplier;
|
||||||
|
@ -82,7 +82,6 @@ import io.druid.segment.serde.FloatGenericColumnSupplier;
|
||||||
import io.druid.segment.serde.LongGenericColumnPartSerde;
|
import io.druid.segment.serde.LongGenericColumnPartSerde;
|
||||||
import io.druid.segment.serde.LongGenericColumnSupplier;
|
import io.druid.segment.serde.LongGenericColumnSupplier;
|
||||||
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
|
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
|
||||||
import org.joda.time.DateTime;
|
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -94,7 +93,6 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.util.AbstractList;
|
import java.util.AbstractList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -203,7 +201,8 @@ public class IndexIO
|
||||||
return convertSegment(toConvert, converted, indexSpec, false);
|
return convertSegment(toConvert, converted, indexSpec, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent) throws IOException
|
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent)
|
||||||
|
throws IOException
|
||||||
{
|
{
|
||||||
final int version = SegmentUtils.getVersionFromDir(toConvert);
|
final int version = SegmentUtils.getVersionFromDir(toConvert);
|
||||||
|
|
||||||
|
@ -230,13 +229,8 @@ public class IndexIO
|
||||||
DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
|
DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
if(forceIfCurrent){
|
if (forceIfCurrent) {
|
||||||
final QueryableIndexIndexableAdapter indexIndexableAdapter = new QueryableIndexIndexableAdapter(loadIndex(toConvert));
|
IndexMaker.convert(toConvert, converted, indexSpec);
|
||||||
IndexMaker.append(
|
|
||||||
Collections.<IndexableAdapter>singletonList(indexIndexableAdapter),
|
|
||||||
converted,
|
|
||||||
indexSpec
|
|
||||||
);
|
|
||||||
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted);
|
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -251,6 +245,112 @@ public class IndexIO
|
||||||
public MMappedIndex mapDir(File inDir) throws IOException;
|
public MMappedIndex mapDir(File inDir) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void validateRowValues(
|
||||||
|
Rowboat rb1,
|
||||||
|
IndexableAdapter adapter1,
|
||||||
|
Rowboat rb2,
|
||||||
|
IndexableAdapter adapter2
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final int[][] dims1 = rb1.getDims();
|
||||||
|
final int[][] dims2 = rb2.getDims();
|
||||||
|
if (dims1.length != dims2.length) {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dim lengths not equal %s vs %s",
|
||||||
|
Arrays.deepToString(dims1),
|
||||||
|
Arrays.deepToString(dims2)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
final Indexed<String> dim1Names = adapter1.getDimensionNames();
|
||||||
|
final Indexed<String> dim2Names = adapter2.getDimensionNames();
|
||||||
|
for (int i = 0; i < dims1.length; ++i) {
|
||||||
|
final int[] dim1Vals = dims1[i];
|
||||||
|
final int[] dim2Vals = dims2[i];
|
||||||
|
final String dim1Name = dim1Names.get(i);
|
||||||
|
final String dim2Name = dim2Names.get(i);
|
||||||
|
final Indexed<String> dim1ValNames = adapter1.getDimValueLookup(dim1Name);
|
||||||
|
final Indexed<String> dim2ValNames = adapter2.getDimValueLookup(dim2Name);
|
||||||
|
|
||||||
|
if (dim1Vals == null || dim2Vals == null) {
|
||||||
|
if (dim1Vals != dim2Vals) {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Expected nulls, found %s and %s",
|
||||||
|
Arrays.toString(dim1Vals),
|
||||||
|
Arrays.toString(dim2Vals)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (dim1Vals.length != dim2Vals.length) {
|
||||||
|
// Might be OK if one of them has null. This occurs in IndexMakerTest
|
||||||
|
if (dim1Vals.length == 0 && dim2Vals.length == 1) {
|
||||||
|
final String dimValName = dim2ValNames.get(dim2Vals[0]);
|
||||||
|
if (dimValName == null) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dim [%s] value [%s] is not null",
|
||||||
|
dim2Name,
|
||||||
|
dimValName
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else if (dim2Vals.length == 0 && dim1Vals.length == 1) {
|
||||||
|
final String dimValName = dim1ValNames.get(dim1Vals[0]);
|
||||||
|
if (dimValName == null) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dim [%s] value [%s] is not null",
|
||||||
|
dim1Name,
|
||||||
|
dimValName
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dim [%s] value lengths not equal. Expected %d found %d",
|
||||||
|
dim1Name,
|
||||||
|
dims1.length,
|
||||||
|
dims2.length
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int j = 0; j < Math.max(dim1Vals.length, dim2Vals.length); ++j) {
|
||||||
|
final int dIdex1 = dim1Vals.length <= j ? -1 : dim1Vals[j];
|
||||||
|
final int dIdex2 = dim2Vals.length <= j ? -1 : dim2Vals[j];
|
||||||
|
|
||||||
|
if (dIdex1 == dIdex2) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String dim1ValName = dIdex1 < 0 ? null : dim1ValNames.get(dIdex1);
|
||||||
|
final String dim2ValName = dIdex2 < 0 ? null : dim2ValNames.get(dIdex2);
|
||||||
|
if ((dim1ValName == null) || (dim2ValName == null)) {
|
||||||
|
if ((dim1ValName == null) && (dim2ValName == null)) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dim [%s] value not equal. Expected [%s] found [%s]",
|
||||||
|
dim1Name,
|
||||||
|
dim1ValName,
|
||||||
|
dim2ValName
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!dim1ValName.equals(dim2ValName)) {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dim [%s] value not equal. Expected [%s] found [%s]",
|
||||||
|
dim1Name,
|
||||||
|
dim1ValName,
|
||||||
|
dim2ValName
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class DefaultIndexIOHandler implements IndexIOHandler
|
public static class DefaultIndexIOHandler implements IndexIOHandler
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(DefaultIndexIOHandler.class);
|
private static final Logger log = new Logger(DefaultIndexIOHandler.class);
|
||||||
|
@ -278,10 +378,10 @@ public class IndexIO
|
||||||
|
|
||||||
indexBuffer.get(); // Skip the version byte
|
indexBuffer.get(); // Skip the version byte
|
||||||
final GenericIndexed<String> availableDimensions = GenericIndexed.read(
|
final GenericIndexed<String> availableDimensions = GenericIndexed.read(
|
||||||
indexBuffer, GenericIndexed.stringStrategy
|
indexBuffer, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
|
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
|
||||||
indexBuffer, GenericIndexed.stringStrategy
|
indexBuffer, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
|
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
|
||||||
final BitmapSerdeFactory bitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
|
final BitmapSerdeFactory bitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
|
||||||
|
@ -315,7 +415,7 @@ public class IndexIO
|
||||||
fileDimensionName
|
fileDimensionName
|
||||||
);
|
);
|
||||||
|
|
||||||
dimValueLookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.stringStrategy));
|
dimValueLookups.put(dimension, GenericIndexed.read(dimBuffer, GenericIndexed.STRING_STRATEGY));
|
||||||
dimColumns.put(dimension, VSizeIndexed.readFromByteBuffer(dimBuffer));
|
dimColumns.put(dimension, VSizeIndexed.readFromByteBuffer(dimBuffer));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,30 +459,68 @@ public class IndexIO
|
||||||
|
|
||||||
public static void validateTwoSegments(File dir1, File dir2) throws IOException
|
public static void validateTwoSegments(File dir1, File dir2) throws IOException
|
||||||
{
|
{
|
||||||
final QueryableIndexIndexableAdapter adapter1 = new QueryableIndexIndexableAdapter(loadIndex(dir1));
|
validateTwoSegments(
|
||||||
final QueryableIndexIndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(loadIndex(dir2));
|
new QueryableIndexIndexableAdapter(loadIndex(dir1)),
|
||||||
if(adapter1.getNumRows() != adapter2.getNumRows()){
|
new QueryableIndexIndexableAdapter(loadIndex(dir2))
|
||||||
throw new IOException("Validation failure - Row count mismatch");
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void validateTwoSegments(final IndexableAdapter adapter1, final IndexableAdapter adapter2)
|
||||||
|
{
|
||||||
|
if (adapter1.getNumRows() != adapter2.getNumRows()) {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Row count mismatch. Expected [%d] found [%d]",
|
||||||
|
adapter1.getNumRows(),
|
||||||
|
adapter2.getNumRows()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
final Set<String> dimNames1 = Sets.newHashSet(adapter1.getDimensionNames());
|
||||||
|
final Set<String> dimNames2 = Sets.newHashSet(adapter2.getDimensionNames());
|
||||||
|
if (!dimNames1.equals(dimNames2)) {
|
||||||
|
throw new SegmentValidationException(
|
||||||
|
"Dimension names differ. Expected [%s] found [%s]",
|
||||||
|
dimNames1,
|
||||||
|
dimNames2
|
||||||
|
);
|
||||||
|
}
|
||||||
|
final Set<String> metNames1 = Sets.newHashSet(adapter1.getMetricNames());
|
||||||
|
final Set<String> metNames2 = Sets.newHashSet(adapter2.getMetricNames());
|
||||||
|
if (!metNames1.equals(metNames2)) {
|
||||||
|
throw new SegmentValidationException("Metric names differ. Expected [%s] found [%s]", metNames1, metNames2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
final Iterator<Rowboat> it1 = adapter1.getRows().iterator();
|
final Iterator<Rowboat> it1 = adapter1.getRows().iterator();
|
||||||
final Iterator<Rowboat> it2 = adapter2.getRows().iterator();
|
final Iterator<Rowboat> it2 = adapter2.getRows().iterator();
|
||||||
long row = 0L;
|
long row = 0L;
|
||||||
while(it1.hasNext()){
|
while (it1.hasNext()) {
|
||||||
if(it1.hasNext() ^ it2.hasNext()){
|
if (!it2.hasNext()) {
|
||||||
throw new IOException("Validation failure - Iterator doesn't have enough");
|
throw new SegmentValidationException("Unexpected end of second adapter");
|
||||||
}
|
}
|
||||||
final Rowboat rb1 = it1.next();
|
final Rowboat rb1 = it1.next();
|
||||||
final Rowboat rb2 = it2.next();
|
final Rowboat rb2 = it2.next();
|
||||||
++row;
|
++row;
|
||||||
if(rb1.compareTo(rb2) != 0){
|
if (rb1.getRowNum() != rb2.getRowNum()) {
|
||||||
throw new IOException(String.format("Validation failure on row %d: [%s] vs [%s]", row, rb1, rb2));
|
throw new SegmentValidationException("Row number mismatch: [%d] vs [%d]", rb1.getRowNum(), rb2.getRowNum());
|
||||||
|
}
|
||||||
|
if (rb1.compareTo(rb2) != 0) {
|
||||||
|
try {
|
||||||
|
validateRowValues(rb1, adapter1, rb2, adapter2);
|
||||||
|
}
|
||||||
|
catch (SegmentValidationException ex) {
|
||||||
|
throw new SegmentValidationException(ex, "Validation failure on row %d: [%s] vs [%s]", row, rb1, rb2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(it2.hasNext()){
|
if (it2.hasNext()) {
|
||||||
throw new IOException("Validation failure - Iterator still has more");
|
throw new SegmentValidationException("Unexpected end of first adapter");
|
||||||
}
|
}
|
||||||
if(row != adapter1.getNumRows()){
|
if (row != adapter1.getNumRows()) {
|
||||||
throw new IOException("Validation failure - Actual Row count mismatch");
|
throw new SegmentValidationException(
|
||||||
|
"Actual Row count mismatch. Expected [%d] found [%d]",
|
||||||
|
row,
|
||||||
|
adapter1.getNumRows()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -455,7 +593,7 @@ public class IndexIO
|
||||||
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
|
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
|
||||||
|
|
||||||
GenericIndexed<String> dictionary = GenericIndexed.read(
|
GenericIndexed<String> dictionary = GenericIndexed.read(
|
||||||
dimBuffer, GenericIndexed.stringStrategy
|
dimBuffer, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
|
|
||||||
if (dictionary.size() == 0) {
|
if (dictionary.size() == 0) {
|
||||||
|
@ -503,7 +641,7 @@ public class IndexIO
|
||||||
|
|
||||||
dictionary = GenericIndexed.fromIterable(
|
dictionary = GenericIndexed.fromIterable(
|
||||||
Iterables.concat(nullList, dictionary),
|
Iterables.concat(nullList, dictionary),
|
||||||
GenericIndexed.stringStrategy
|
GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
|
|
||||||
bitmaps = GenericIndexed.fromIterable(
|
bitmaps = GenericIndexed.fromIterable(
|
||||||
|
@ -574,8 +712,11 @@ public class IndexIO
|
||||||
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
|
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if(compressionStrategy != null) {
|
if (compressionStrategy != null) {
|
||||||
log.info("Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]", dimension);
|
log.info(
|
||||||
|
"Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
|
||||||
|
dimension
|
||||||
|
);
|
||||||
}
|
}
|
||||||
columnPartBuilder.withMultiValuedColumn(multiValCol);
|
columnPartBuilder.withMultiValuedColumn(multiValCol);
|
||||||
}
|
}
|
||||||
|
@ -670,7 +811,7 @@ public class IndexIO
|
||||||
|
|
||||||
indexBuffer.get(); // Skip the version byte
|
indexBuffer.get(); // Skip the version byte
|
||||||
final GenericIndexed<String> dims8 = GenericIndexed.read(
|
final GenericIndexed<String> dims8 = GenericIndexed.read(
|
||||||
indexBuffer, GenericIndexed.stringStrategy
|
indexBuffer, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
|
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
|
||||||
Iterables.filter(
|
Iterables.filter(
|
||||||
|
@ -683,10 +824,10 @@ public class IndexIO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
GenericIndexed.stringStrategy
|
GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
|
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
|
||||||
indexBuffer, GenericIndexed.stringStrategy
|
indexBuffer, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
|
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
|
||||||
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue(
|
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue(
|
||||||
|
@ -698,7 +839,7 @@ public class IndexIO
|
||||||
columns.addAll(Lists.newArrayList(dims9));
|
columns.addAll(Lists.newArrayList(dims9));
|
||||||
columns.addAll(Lists.newArrayList(availableMetrics));
|
columns.addAll(Lists.newArrayList(availableMetrics));
|
||||||
|
|
||||||
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.stringStrategy);
|
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
|
||||||
|
|
||||||
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
|
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
|
||||||
|
|
||||||
|
@ -840,8 +981,8 @@ public class IndexIO
|
||||||
* Index.drd should consist of the segment version, the columns and dimensions of the segment as generic
|
* Index.drd should consist of the segment version, the columns and dimensions of the segment as generic
|
||||||
* indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type.
|
* indexes, the interval start and end millis as longs (in 16 bytes), and a bitmap index type.
|
||||||
*/
|
*/
|
||||||
final GenericIndexed<String> cols = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy);
|
final GenericIndexed<String> cols = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
|
||||||
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy);
|
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
|
||||||
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
|
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
|
||||||
final BitmapSerdeFactory segmentBitmapSerdeFactory;
|
final BitmapSerdeFactory segmentBitmapSerdeFactory;
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -92,6 +92,7 @@ import java.nio.LongBuffer;
|
||||||
import java.util.AbstractList;
|
import java.util.AbstractList;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -321,6 +322,37 @@ public class IndexMaker
|
||||||
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
|
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException
|
||||||
|
{
|
||||||
|
return convert(inDir, outDir, indexSpec, new BaseProgressIndicator());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static File convert(
|
||||||
|
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
|
||||||
|
) throws IOException
|
||||||
|
{
|
||||||
|
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir));
|
||||||
|
return makeIndexFiles(
|
||||||
|
ImmutableList.of(adapter),
|
||||||
|
outDir,
|
||||||
|
progress,
|
||||||
|
Lists.newArrayList(adapter.getDimensionNames()),
|
||||||
|
Lists.newArrayList(adapter.getMetricNames()),
|
||||||
|
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
|
||||||
|
{
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
|
||||||
|
{
|
||||||
|
return input.get(0);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
indexSpec
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static File append(
|
public static File append(
|
||||||
final List<IndexableAdapter> adapters,
|
final List<IndexableAdapter> adapters,
|
||||||
final File outDir,
|
final File outDir,
|
||||||
|
@ -504,7 +536,16 @@ public class IndexMaker
|
||||||
makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount, indexSpec);
|
makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount, indexSpec);
|
||||||
|
|
||||||
progress.progress();
|
progress.progress();
|
||||||
makeIndexBinary(v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec);
|
makeIndexBinary(
|
||||||
|
v9Smoosher,
|
||||||
|
adapters,
|
||||||
|
outDir,
|
||||||
|
mergedDimensions,
|
||||||
|
mergedMetrics,
|
||||||
|
skippedDimensions,
|
||||||
|
progress,
|
||||||
|
indexSpec
|
||||||
|
);
|
||||||
|
|
||||||
v9Smoosher.close();
|
v9Smoosher.close();
|
||||||
|
|
||||||
|
@ -796,6 +837,33 @@ public class IndexMaker
|
||||||
progress.stopSection(dimSection);
|
progress.stopSection(dimSection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class NullsAtZeroConvertingIntList extends AbstractList<Integer>
|
||||||
|
{
|
||||||
|
private final List<Integer> delegate;
|
||||||
|
private final boolean delegateHasNullAtZero;
|
||||||
|
NullsAtZeroConvertingIntList(List<Integer> delegate, final boolean delegateHasNullAtZero)
|
||||||
|
{
|
||||||
|
this.delegate = delegate;
|
||||||
|
this.delegateHasNullAtZero = delegateHasNullAtZero;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer get(int index)
|
||||||
|
{
|
||||||
|
Integer val = delegate.get(index);
|
||||||
|
if (val == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return delegateHasNullAtZero ? val : val + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size()
|
||||||
|
{
|
||||||
|
return delegate.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void makeDimColumn(
|
private static void makeDimColumn(
|
||||||
final FileSmoosher v9Smoosher,
|
final FileSmoosher v9Smoosher,
|
||||||
final List<IndexableAdapter> adapters,
|
final List<IndexableAdapter> adapters,
|
||||||
|
@ -860,7 +928,7 @@ public class IndexMaker
|
||||||
final Iterable<String> dimensionValues = dimensionValuesLookup.get(dimension);
|
final Iterable<String> dimensionValues = dimensionValuesLookup.get(dimension);
|
||||||
GenericIndexed<String> dictionary = GenericIndexed.fromIterable(
|
GenericIndexed<String> dictionary = GenericIndexed.fromIterable(
|
||||||
dimensionValues,
|
dimensionValues,
|
||||||
GenericIndexed.stringStrategy
|
GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
boolean bumpDictionary = false;
|
boolean bumpDictionary = false;
|
||||||
|
|
||||||
|
@ -873,127 +941,67 @@ public class IndexMaker
|
||||||
bumpDictionary = true;
|
bumpDictionary = true;
|
||||||
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
|
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
|
||||||
|
|
||||||
final List<String> nullList = Lists.newArrayList();
|
|
||||||
nullList.add(null);
|
|
||||||
|
|
||||||
dictionary = GenericIndexed.fromIterable(
|
dictionary = GenericIndexed.fromIterable(
|
||||||
Iterables.concat(nullList, dimensionValues),
|
Iterables.concat(Collections.<String>singleton(null), dimensionValues),
|
||||||
GenericIndexed.stringStrategy
|
GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
|
|
||||||
final int dictionarySize = dictionary.size();
|
final int dictionarySize = dictionary.size();
|
||||||
|
|
||||||
singleValCol = null;
|
singleValCol = null;
|
||||||
multiValCol = VSizeIndexed.fromIterable(
|
multiValCol = VSizeIndexed.fromIterable(
|
||||||
FunctionalIterable
|
Iterables.transform(
|
||||||
.create(vals)
|
vals,
|
||||||
.transform(
|
new Function<List<Integer>, VSizeIndexedInts>()
|
||||||
new Function<List<Integer>, VSizeIndexedInts>()
|
{
|
||||||
{
|
@Override
|
||||||
@Override
|
public VSizeIndexedInts apply(final List<Integer> input)
|
||||||
public VSizeIndexedInts apply(final List<Integer> input)
|
{
|
||||||
{
|
if (input == null) {
|
||||||
if (input == null) {
|
return VSizeIndexedInts.fromList(ImmutableList.<Integer>of(0), dictionarySize);
|
||||||
return VSizeIndexedInts.fromList(
|
} else {
|
||||||
new AbstractList<Integer>()
|
return VSizeIndexedInts.fromList(new NullsAtZeroConvertingIntList(input, false), dictionarySize);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Integer get(int index)
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int size()
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}, dictionarySize
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return VSizeIndexedInts.fromList(
|
|
||||||
new AbstractList<Integer>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Integer get(int index)
|
|
||||||
{
|
|
||||||
Integer val = input.get(index);
|
|
||||||
if (val == null) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return val + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int size()
|
|
||||||
{
|
|
||||||
return input.size();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
dictionarySize
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
final int dictionarySize = dictionary.size();
|
final int dictionarySize = dictionary.size();
|
||||||
singleValCol = null;
|
singleValCol = null;
|
||||||
multiValCol = VSizeIndexed.fromIterable(
|
multiValCol = VSizeIndexed.fromIterable(
|
||||||
FunctionalIterable
|
Iterables.transform(
|
||||||
.create(vals)
|
vals,
|
||||||
.transform(
|
new Function<List<Integer>, VSizeIndexedInts>()
|
||||||
new Function<List<Integer>, VSizeIndexedInts>()
|
{
|
||||||
{
|
@Override
|
||||||
@Override
|
public VSizeIndexedInts apply(List<Integer> input)
|
||||||
public VSizeIndexedInts apply(List<Integer> input)
|
{
|
||||||
{
|
if (input == null) {
|
||||||
if (input == null) {
|
return VSizeIndexedInts.fromList(ImmutableList.<Integer>of(0), dictionarySize);
|
||||||
return VSizeIndexedInts.fromList(
|
} else {
|
||||||
new AbstractList<Integer>()
|
return VSizeIndexedInts.fromList(input, dictionarySize);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Integer get(int index)
|
|
||||||
{
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int size()
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}, dictionarySize
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return VSizeIndexedInts.fromList(
|
|
||||||
input,
|
|
||||||
dictionarySize
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
final int dictionarySize = dictionary.size();
|
final int dictionarySize = dictionary.size();
|
||||||
singleValCol = null;
|
singleValCol = null;
|
||||||
multiValCol = VSizeIndexed.fromIterable(
|
multiValCol = VSizeIndexed.fromIterable(
|
||||||
FunctionalIterable
|
Iterables.transform(
|
||||||
.create(vals)
|
vals,
|
||||||
.transform(
|
new Function<List<Integer>, VSizeIndexedInts>()
|
||||||
new Function<List<Integer>, VSizeIndexedInts>()
|
{
|
||||||
{
|
@Override
|
||||||
@Override
|
public VSizeIndexedInts apply(List<Integer> input)
|
||||||
public VSizeIndexedInts apply(List<Integer> input)
|
{
|
||||||
{
|
return VSizeIndexedInts.fromList(input, dictionarySize);
|
||||||
return VSizeIndexedInts.fromList(
|
}
|
||||||
input,
|
}
|
||||||
dictionarySize
|
)
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1011,47 +1019,13 @@ public class IndexMaker
|
||||||
|
|
||||||
dictionary = GenericIndexed.fromIterable(
|
dictionary = GenericIndexed.fromIterable(
|
||||||
Iterables.concat(nullList, dimensionValues),
|
Iterables.concat(nullList, dimensionValues),
|
||||||
GenericIndexed.stringStrategy
|
GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
multiValCol = null;
|
multiValCol = null;
|
||||||
singleValCol = new AbstractList<Integer>()
|
singleValCol = new NullsAtZeroConvertingIntList(vals, false);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Integer get(int index)
|
|
||||||
{
|
|
||||||
Integer val = vals.get(index);
|
|
||||||
if (val == null) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return val + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int size()
|
|
||||||
{
|
|
||||||
return vals.size();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} else {
|
} else {
|
||||||
multiValCol = null;
|
multiValCol = null;
|
||||||
singleValCol = new AbstractList<Integer>()
|
singleValCol = new NullsAtZeroConvertingIntList(vals, true);
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Integer get(int index)
|
|
||||||
{
|
|
||||||
Integer val = vals.get(index);
|
|
||||||
if (val == null) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int size()
|
|
||||||
{
|
|
||||||
return vals.size();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
multiValCol = null;
|
multiValCol = null;
|
||||||
|
@ -1214,8 +1188,11 @@ public class IndexMaker
|
||||||
dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
|
dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if(compressionStrategy != null) {
|
if (compressionStrategy != null) {
|
||||||
log.info("Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]", dimension);
|
log.info(
|
||||||
|
"Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]",
|
||||||
|
dimension
|
||||||
|
);
|
||||||
}
|
}
|
||||||
dimPartBuilder.withMultiValuedColumn(multiValCol);
|
dimPartBuilder.withMultiValuedColumn(multiValCol);
|
||||||
}
|
}
|
||||||
|
@ -1399,8 +1376,8 @@ public class IndexMaker
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
GenericIndexed<String> cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.stringStrategy);
|
GenericIndexed<String> cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.STRING_STRATEGY);
|
||||||
GenericIndexed<String> dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.stringStrategy);
|
GenericIndexed<String> dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.STRING_STRATEGY);
|
||||||
|
|
||||||
final String bitmapSerdeFactoryType = mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory());
|
final String bitmapSerdeFactoryType = mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory());
|
||||||
final long numBytes = cols.getSerializedSize()
|
final long numBytes = cols.getSerializedSize()
|
||||||
|
@ -1634,7 +1611,7 @@ public class IndexMaker
|
||||||
{
|
{
|
||||||
int[][] dims = input.getDims();
|
int[][] dims = input.getDims();
|
||||||
int[][] newDims = new int[convertedDims.size()][];
|
int[][] newDims = new int[convertedDims.size()][];
|
||||||
for (int i = 0; i < convertedDims.size(); ++i) {
|
for (int i = 0; i < newDims.length; ++i) {
|
||||||
IntBuffer converter = converters.get(convertedDims.get(i));
|
IntBuffer converter = converters.get(convertedDims.get(i));
|
||||||
|
|
||||||
if (converter == null) {
|
if (converter == null) {
|
||||||
|
|
|
@ -144,13 +144,22 @@ public class IndexMerger
|
||||||
*
|
*
|
||||||
* @throws java.io.IOException if an IO error occurs persisting the index
|
* @throws java.io.IOException if an IO error occurs persisting the index
|
||||||
*/
|
*/
|
||||||
public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir, IndexSpec indexSpec) throws IOException
|
public static File persist(
|
||||||
|
final IncrementalIndex index,
|
||||||
|
final Interval dataInterval,
|
||||||
|
File outDir,
|
||||||
|
IndexSpec indexSpec
|
||||||
|
) throws IOException
|
||||||
{
|
{
|
||||||
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
|
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static File persist(
|
public static File persist(
|
||||||
final IncrementalIndex index, final Interval dataInterval, File outDir, IndexSpec indexSpec, ProgressIndicator progress
|
final IncrementalIndex index,
|
||||||
|
final Interval dataInterval,
|
||||||
|
File outDir,
|
||||||
|
IndexSpec indexSpec,
|
||||||
|
ProgressIndicator progress
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
if (index.isEmpty()) {
|
if (index.isEmpty()) {
|
||||||
|
@ -199,7 +208,11 @@ public class IndexMerger
|
||||||
}
|
}
|
||||||
|
|
||||||
public static File mergeQueryableIndex(
|
public static File mergeQueryableIndex(
|
||||||
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress
|
List<QueryableIndex> indexes,
|
||||||
|
final AggregatorFactory[] metricAggs,
|
||||||
|
File outDir,
|
||||||
|
IndexSpec indexSpec,
|
||||||
|
ProgressIndicator progress
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
return merge(
|
return merge(
|
||||||
|
@ -229,7 +242,11 @@ public class IndexMerger
|
||||||
}
|
}
|
||||||
|
|
||||||
public static File merge(
|
public static File merge(
|
||||||
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress
|
List<IndexableAdapter> indexes,
|
||||||
|
final AggregatorFactory[] metricAggs,
|
||||||
|
File outDir,
|
||||||
|
IndexSpec indexSpec,
|
||||||
|
ProgressIndicator progress
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
FileUtils.deleteDirectory(outDir);
|
FileUtils.deleteDirectory(outDir);
|
||||||
|
@ -453,8 +470,8 @@ public class IndexMerger
|
||||||
channel = fileOutputStream.getChannel();
|
channel = fileOutputStream.getChannel();
|
||||||
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
|
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
|
||||||
|
|
||||||
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy).writeToChannel(channel);
|
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
|
||||||
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy).writeToChannel(channel);
|
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
|
||||||
|
|
||||||
DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT);
|
DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT);
|
||||||
DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT);
|
DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT);
|
||||||
|
@ -492,7 +509,7 @@ public class IndexMerger
|
||||||
|
|
||||||
for (String dimension : mergedDimensions) {
|
for (String dimension : mergedDimensions) {
|
||||||
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
|
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
|
||||||
ioPeon, dimension, GenericIndexed.stringStrategy
|
ioPeon, dimension, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
writer.open();
|
writer.open();
|
||||||
|
|
||||||
|
@ -762,7 +779,7 @@ public class IndexMerger
|
||||||
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
|
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
|
||||||
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
|
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
|
||||||
}
|
}
|
||||||
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
|
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
|
||||||
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
|
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
|
||||||
|
|
||||||
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
|
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
|
||||||
|
@ -875,8 +892,8 @@ public class IndexMerger
|
||||||
createIndexDrdFile(
|
createIndexDrdFile(
|
||||||
IndexIO.V8_VERSION,
|
IndexIO.V8_VERSION,
|
||||||
v8OutDir,
|
v8OutDir,
|
||||||
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
|
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY),
|
||||||
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
|
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY),
|
||||||
dataInterval,
|
dataInterval,
|
||||||
indexSpec.getBitmapSerdeFactory()
|
indexSpec.getBitmapSerdeFactory()
|
||||||
);
|
);
|
||||||
|
|
|
@ -17,13 +17,19 @@
|
||||||
|
|
||||||
package io.druid.segment;
|
package io.druid.segment;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
@ -129,8 +135,8 @@ public class Rowboat implements Comparable<Rowboat>
|
||||||
{
|
{
|
||||||
return "Rowboat{" +
|
return "Rowboat{" +
|
||||||
"timestamp=" + new DateTime(timestamp).toString() +
|
"timestamp=" + new DateTime(timestamp).toString() +
|
||||||
", dims=" + (dims == null ? null : Arrays.asList(dims)) +
|
", dims=" + Arrays.deepToString(dims) +
|
||||||
", metrics=" + (metrics == null ? null : Arrays.asList(metrics)) +
|
", metrics=" + Arrays.toString(metrics) +
|
||||||
", comprisedRows=" + comprisedRows +
|
", comprisedRows=" + comprisedRows +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class SegmentValidationException extends RuntimeException
|
||||||
|
{
|
||||||
|
public SegmentValidationException(String formatText, Object... arguments)
|
||||||
|
{
|
||||||
|
super(String.format(formatText, arguments));
|
||||||
|
}
|
||||||
|
|
||||||
|
public SegmentValidationException(Throwable cause, String formatText, Object... arguments)
|
||||||
|
{
|
||||||
|
super(String.format(formatText, arguments), cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -319,7 +319,7 @@ public class GenericIndexed<T> implements Indexed<T>
|
||||||
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ObjectStrategy<String> stringStrategy = new CacheableObjectStrategy<String>()
|
public static final ObjectStrategy<String> STRING_STRATEGY = new CacheableObjectStrategy<String>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Class<? extends String> getClazz()
|
public Class<? extends String> getClazz()
|
||||||
|
|
|
@ -45,11 +45,11 @@ public class IncrementalIndexAdapter implements IndexableAdapter
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(IncrementalIndexAdapter.class);
|
private static final Logger log = new Logger(IncrementalIndexAdapter.class);
|
||||||
private final Interval dataInterval;
|
private final Interval dataInterval;
|
||||||
private final IncrementalIndex<Object> index;
|
private final IncrementalIndex<?> index;
|
||||||
private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
|
private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
|
||||||
|
|
||||||
public IncrementalIndexAdapter(
|
public IncrementalIndexAdapter(
|
||||||
Interval dataInterval, IncrementalIndex<Object> index, BitmapFactory bitmapFactory
|
Interval dataInterval, IncrementalIndex<?> index, BitmapFactory bitmapFactory
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.dataInterval = dataInterval;
|
this.dataInterval = dataInterval;
|
||||||
|
|
|
@ -317,7 +317,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
|
||||||
throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns");
|
throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns");
|
||||||
}
|
}
|
||||||
|
|
||||||
final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.stringStrategy);
|
final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.STRING_STRATEGY);
|
||||||
builder.setType(ValueType.STRING);
|
builder.setType(ValueType.STRING);
|
||||||
|
|
||||||
final WritableSupplier<IndexedInts> rSingleValuedColumn;
|
final WritableSupplier<IndexedInts> rSingleValuedColumn;
|
||||||
|
|
|
@ -17,16 +17,291 @@
|
||||||
|
|
||||||
package io.druid.segment;
|
package io.druid.segment;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Collections2;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.metamx.common.UOE;
|
||||||
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
|
import io.druid.granularity.QueryGranularity;
|
||||||
|
import io.druid.query.aggregation.Aggregator;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.segment.data.CompressedObjectStrategy;
|
||||||
|
import io.druid.segment.data.ConciseBitmapSerdeFactory;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndexAdapter;
|
||||||
|
import io.druid.segment.incremental.IndexSizeExceededException;
|
||||||
|
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
|
import org.joda.time.Interval;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Ignore;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.BitSet;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is mostly a test of the validator
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class IndexIOTest
|
public class IndexIOTest
|
||||||
{
|
{
|
||||||
@Test @Ignore // this test depends on static fields, so it has to be tested independently
|
private static Interval DEFAULT_INTERVAL = Interval.parse("1970-01-01/2000-01-01");
|
||||||
public void testInjector() throws Exception
|
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
|
||||||
|
new ConciseBitmapSerdeFactory(),
|
||||||
|
CompressedObjectStrategy.CompressionStrategy.LZ4
|
||||||
|
);
|
||||||
|
|
||||||
|
private static <T> List<T> filterByBitset(List<T> list, BitSet bitSet)
|
||||||
{
|
{
|
||||||
System.setProperty("druid.processing.columnCache.sizeBytes", "1234");
|
final ArrayList<T> outList = new ArrayList<>(bitSet.cardinality());
|
||||||
Assert.assertEquals(1234, IndexIO.columnConfig.columnCacheSizeBytes());
|
for (int i = 0; i < list.size(); ++i) {
|
||||||
|
if (bitSet.get(i)) {
|
||||||
|
outList.add(list.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return outList;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Iterable<Object[]> constructionFeeder()
|
||||||
|
{
|
||||||
|
|
||||||
|
final Map<String, Object> map = ImmutableMap.<String, Object>of();
|
||||||
|
|
||||||
|
final Map<String, Object> map00 = ImmutableMap.<String, Object>of(
|
||||||
|
"dim0", ImmutableList.<String>of("dim00", "dim01")
|
||||||
|
);
|
||||||
|
final Map<String, Object> map10 = ImmutableMap.<String, Object>of(
|
||||||
|
"dim1", "dim10"
|
||||||
|
);
|
||||||
|
final Map<String, Object> map0null = new HashMap<>();
|
||||||
|
map0null.put("dim0", null);
|
||||||
|
|
||||||
|
final Map<String, Object> map1null = new HashMap<>();
|
||||||
|
map1null.put("dim1", null);
|
||||||
|
|
||||||
|
final Map<String, Object> mapAll = ImmutableMap.<String, Object>of(
|
||||||
|
"dim0", ImmutableList.<String>of("dim00", "dim01"),
|
||||||
|
"dim1", "dim10"
|
||||||
|
);
|
||||||
|
|
||||||
|
final List<Map<String, Object>> maps = ImmutableList.of(
|
||||||
|
map, map00, map10, map0null, map1null, mapAll
|
||||||
|
);
|
||||||
|
|
||||||
|
return Iterables.<Object[]>concat(
|
||||||
|
// First iterable tests permutations of the maps which are expected to be equal
|
||||||
|
Iterables.<Object[]>concat(
|
||||||
|
new Iterable<Iterable<Object[]>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Iterator<Iterable<Object[]>> iterator()
|
||||||
|
{
|
||||||
|
return new Iterator<Iterable<Object[]>>()
|
||||||
|
{
|
||||||
|
long nextBitset = 1L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
return nextBitset < (1L << maps.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterable<Object[]> next()
|
||||||
|
{
|
||||||
|
final BitSet bitset = BitSet.valueOf(new long[]{nextBitset++});
|
||||||
|
final List<Map<String, Object>> myMaps = filterByBitset(maps, bitset);
|
||||||
|
return Collections2.transform(
|
||||||
|
Collections2.permutations(myMaps), new Function<List<Map<String, Object>>, Object[]>()
|
||||||
|
{
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public Object[] apply(List<Map<String, Object>> input)
|
||||||
|
{
|
||||||
|
return new Object[]{input, input, null};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UOE("Remove not suported");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
),
|
||||||
|
// Second iterable tests combinations of the maps which may or may not be equal
|
||||||
|
Iterables.<Object[]>concat(
|
||||||
|
new Iterable<Iterable<Object[]>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Iterator<Iterable<Object[]>> iterator()
|
||||||
|
{
|
||||||
|
return new Iterator<Iterable<Object[]>>()
|
||||||
|
{
|
||||||
|
long nextMap1Bits = 1L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
return nextMap1Bits < (1L << maps.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterable<Object[]> next()
|
||||||
|
{
|
||||||
|
final BitSet bitset1 = BitSet.valueOf(new long[]{nextMap1Bits++});
|
||||||
|
final List<Map<String, Object>> maplist1 = filterByBitset(maps, bitset1);
|
||||||
|
return new Iterable<Object[]>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Iterator<Object[]> iterator()
|
||||||
|
{
|
||||||
|
return new Iterator<Object[]>()
|
||||||
|
{
|
||||||
|
long nextMap2Bits = 1L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
return nextMap2Bits < (1L << maps.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] next()
|
||||||
|
{
|
||||||
|
|
||||||
|
final BitSet bitset2 = BitSet.valueOf(new long[]{nextMap2Bits++});
|
||||||
|
return new Object[]{
|
||||||
|
maplist1,
|
||||||
|
filterByBitset(maps, bitset2),
|
||||||
|
nextMap2Bits == nextMap1Bits ? null : SegmentValidationException.class
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UOE("remove not supported");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UOE("Remove not supported");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Collection<Map<String, Object>> events1;
|
||||||
|
private final Collection<Map<String, Object>> events2;
|
||||||
|
private final Class<? extends Exception> exception;
|
||||||
|
|
||||||
|
public IndexIOTest(
|
||||||
|
Collection<Map<String, Object>> events1,
|
||||||
|
Collection<Map<String, Object>> events2,
|
||||||
|
Class<? extends Exception> exception
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.events1 = events1;
|
||||||
|
this.events2 = events2;
|
||||||
|
this.exception = exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
final IncrementalIndex<Aggregator> incrementalIndex1 = new OnheapIncrementalIndex(
|
||||||
|
DEFAULT_INTERVAL.getStart().getMillis(),
|
||||||
|
QueryGranularity.NONE,
|
||||||
|
new AggregatorFactory[]{
|
||||||
|
new CountAggregatorFactory(
|
||||||
|
"count"
|
||||||
|
)
|
||||||
|
},
|
||||||
|
1000000
|
||||||
|
);
|
||||||
|
|
||||||
|
final IncrementalIndex<Aggregator> incrementalIndex2 = new OnheapIncrementalIndex(
|
||||||
|
DEFAULT_INTERVAL.getStart().getMillis(),
|
||||||
|
QueryGranularity.NONE,
|
||||||
|
new AggregatorFactory[]{
|
||||||
|
new CountAggregatorFactory(
|
||||||
|
"count"
|
||||||
|
)
|
||||||
|
},
|
||||||
|
1000000
|
||||||
|
);
|
||||||
|
|
||||||
|
IndexableAdapter adapter1;
|
||||||
|
IndexableAdapter adapter2;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IndexSizeExceededException
|
||||||
|
{
|
||||||
|
long timestamp = 0L;
|
||||||
|
for (Map<String, Object> event : events1) {
|
||||||
|
incrementalIndex1.add(new MapBasedInputRow(timestamp++, Lists.newArrayList(event.keySet()), event));
|
||||||
|
}
|
||||||
|
|
||||||
|
timestamp = 0L;
|
||||||
|
for (Map<String, Object> event : events2) {
|
||||||
|
incrementalIndex2.add(new MapBasedInputRow(timestamp++, Lists.newArrayList(event.keySet()), event));
|
||||||
|
}
|
||||||
|
|
||||||
|
adapter2 = new IncrementalIndexAdapter(
|
||||||
|
DEFAULT_INTERVAL,
|
||||||
|
incrementalIndex2,
|
||||||
|
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
|
||||||
|
);
|
||||||
|
|
||||||
|
adapter1 = new IncrementalIndexAdapter(
|
||||||
|
DEFAULT_INTERVAL,
|
||||||
|
incrementalIndex1,
|
||||||
|
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRowValidatorEquals() throws Exception
|
||||||
|
{
|
||||||
|
Exception ex = null;
|
||||||
|
try {
|
||||||
|
IndexIO.DefaultIndexIOHandler.validateTwoSegments(adapter1, adapter2);
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
ex = e;
|
||||||
|
}
|
||||||
|
if (exception != null) {
|
||||||
|
Assert.assertNotNull("Exception was not thrown", ex);
|
||||||
|
if (!exception.isAssignableFrom(ex.getClass())) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (ex != null) {
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,253 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Collections2;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
import io.druid.common.utils.JodaUtils;
|
||||||
|
import io.druid.data.input.InputRow;
|
||||||
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
|
import io.druid.granularity.QueryGranularity;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import io.druid.segment.data.CompressedObjectStrategy;
|
||||||
|
import io.druid.segment.data.ConciseBitmapSerdeFactory;
|
||||||
|
import io.druid.segment.incremental.IncrementalIndex;
|
||||||
|
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class IndexMakerTest
|
||||||
|
{
|
||||||
|
private static final long TIMESTAMP = DateTime.parse("2014-01-01").getMillis();
|
||||||
|
private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{
|
||||||
|
new CountAggregatorFactory(
|
||||||
|
"count"
|
||||||
|
)
|
||||||
|
};
|
||||||
|
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
|
||||||
|
new ConciseBitmapSerdeFactory(),
|
||||||
|
CompressedObjectStrategy.CompressionStrategy.LZ4
|
||||||
|
);
|
||||||
|
private static final List<String> DIMS = ImmutableList.of("dim0", "dim1");
|
||||||
|
|
||||||
|
private static final Function<Collection<Map<String, Object>>, Object[]> OBJECT_MAKER = new Function<Collection<Map<String, Object>>, Object[]>()
|
||||||
|
{
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public Object[] apply(Collection<Map<String, Object>> input)
|
||||||
|
{
|
||||||
|
final ArrayList<InputRow> list = new ArrayList<>();
|
||||||
|
int i = 0;
|
||||||
|
for (final Map<String, Object> map : input) {
|
||||||
|
list.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map));
|
||||||
|
}
|
||||||
|
return new Object[]{list};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@SafeVarargs
|
||||||
|
public static Collection<Object[]> permute(Map<String, Object>... maps)
|
||||||
|
{
|
||||||
|
if (maps == null) {
|
||||||
|
return ImmutableList.<Object[]>of();
|
||||||
|
}
|
||||||
|
return Collections2.transform(
|
||||||
|
Collections2.permutations(
|
||||||
|
Arrays.asList(maps)
|
||||||
|
),
|
||||||
|
OBJECT_MAKER
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Iterable<Object[]> paramFeeder()
|
||||||
|
{
|
||||||
|
final Map<String, Object> map1 = ImmutableMap.<String, Object>of(
|
||||||
|
DIMS.get(0), ImmutableList.<String>of("dim00", "dim01"),
|
||||||
|
DIMS.get(1), "dim10"
|
||||||
|
);
|
||||||
|
|
||||||
|
final List<String> nullList = Collections.singletonList(null);
|
||||||
|
|
||||||
|
final Map<String, Object> map2 = ImmutableMap.<String, Object>of(
|
||||||
|
DIMS.get(0), nullList,
|
||||||
|
DIMS.get(1), "dim10"
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
final Map<String, Object> map3 = ImmutableMap.<String, Object>of(
|
||||||
|
DIMS.get(0),
|
||||||
|
ImmutableList.<String>of("dim00", "dim01")
|
||||||
|
);
|
||||||
|
|
||||||
|
final Map<String, Object> map4 = ImmutableMap.<String, Object>of();
|
||||||
|
|
||||||
|
final Map<String, Object> map5 = ImmutableMap.<String, Object>of(DIMS.get(1), "dim10");
|
||||||
|
|
||||||
|
final Map<String, Object> map6 = new HashMap<>();
|
||||||
|
map6.put(DIMS.get(1), null); // ImmutableMap cannot take null
|
||||||
|
|
||||||
|
|
||||||
|
return Iterables.<Object[]>concat(
|
||||||
|
permute(map1)
|
||||||
|
, permute(map1, map4)
|
||||||
|
, permute(map1, map5)
|
||||||
|
, permute(map5, map6)
|
||||||
|
, permute(map4, map5)
|
||||||
|
, Iterables.transform(ImmutableList.of(Arrays.asList(map1, map2, map3, map4, map5, map6)), OBJECT_MAKER)
|
||||||
|
);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Collection<InputRow> events;
|
||||||
|
|
||||||
|
public IndexMakerTest(
|
||||||
|
final Collection<InputRow> events
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.events = events;
|
||||||
|
}
|
||||||
|
|
||||||
|
IncrementalIndex toPersist;
|
||||||
|
File tmpDir;
|
||||||
|
File persistTmpDir;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException
|
||||||
|
{
|
||||||
|
toPersist = new OnheapIncrementalIndex(
|
||||||
|
JodaUtils.MIN_INSTANT,
|
||||||
|
QueryGranularity.NONE,
|
||||||
|
DEFAULT_AGG_FACTORIES,
|
||||||
|
1000000
|
||||||
|
);
|
||||||
|
for (InputRow event : events) {
|
||||||
|
toPersist.add(event);
|
||||||
|
}
|
||||||
|
tmpDir = Files.createTempDir();
|
||||||
|
persistTmpDir = new File(tmpDir, "persistDir");
|
||||||
|
IndexMerger.persist(toPersist, persistTmpDir, INDEX_SPEC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException
|
||||||
|
{
|
||||||
|
FileUtils.deleteDirectory(tmpDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleReprocess() throws IOException
|
||||||
|
{
|
||||||
|
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||||
|
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||||
|
reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private File reprocessAndValidate(File inDir, File tmpDir) throws IOException
|
||||||
|
{
|
||||||
|
final File outDir = IndexMaker.convert(
|
||||||
|
inDir,
|
||||||
|
tmpDir,
|
||||||
|
INDEX_SPEC
|
||||||
|
);
|
||||||
|
IndexIO.DefaultIndexIOHandler.validateTwoSegments(persistTmpDir, outDir);
|
||||||
|
return outDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
private File appendAndValidate(File inDir, File tmpDir) throws IOException
|
||||||
|
{
|
||||||
|
final File outDir = IndexMerger.append(
|
||||||
|
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir))),
|
||||||
|
tmpDir,
|
||||||
|
INDEX_SPEC
|
||||||
|
);
|
||||||
|
IndexIO.DefaultIndexIOHandler.validateTwoSegments(persistTmpDir, outDir);
|
||||||
|
return outDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIdempotentReprocess() throws IOException
|
||||||
|
{
|
||||||
|
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||||
|
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||||
|
final File tmpDir1 = new File(tmpDir, "reprocessed1");
|
||||||
|
reprocessAndValidate(persistTmpDir, tmpDir1);
|
||||||
|
|
||||||
|
final File tmpDir2 = new File(tmpDir, "reprocessed2");
|
||||||
|
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1));
|
||||||
|
Assert.assertEquals(events.size(), adapter2.getNumRows());
|
||||||
|
reprocessAndValidate(tmpDir1, tmpDir2);
|
||||||
|
|
||||||
|
final File tmpDir3 = new File(tmpDir, "reprocessed3");
|
||||||
|
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2));
|
||||||
|
Assert.assertEquals(events.size(), adapter3.getNumRows());
|
||||||
|
reprocessAndValidate(tmpDir2, tmpDir3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleAppend() throws IOException
|
||||||
|
{
|
||||||
|
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||||
|
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||||
|
appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIdempotentAppend() throws IOException
|
||||||
|
{
|
||||||
|
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(persistTmpDir));
|
||||||
|
Assert.assertEquals(events.size(), adapter.getNumRows());
|
||||||
|
final File tmpDir1 = new File(tmpDir, "reprocessed1");
|
||||||
|
appendAndValidate(persistTmpDir, tmpDir1);
|
||||||
|
|
||||||
|
final File tmpDir2 = new File(tmpDir, "reprocessed2");
|
||||||
|
final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir1));
|
||||||
|
Assert.assertEquals(events.size(), adapter2.getNumRows());
|
||||||
|
appendAndValidate(tmpDir1, tmpDir2);
|
||||||
|
|
||||||
|
final File tmpDir3 = new File(tmpDir, "reprocessed3");
|
||||||
|
final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(tmpDir2));
|
||||||
|
Assert.assertEquals(events.size(), adapter3.getNumRows());
|
||||||
|
appendAndValidate(tmpDir2, tmpDir3);
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,7 +36,7 @@ public class GenericIndexedTest
|
||||||
@Test(expected = UnsupportedOperationException.class)
|
@Test(expected = UnsupportedOperationException.class)
|
||||||
public void testNotSortedNoIndexOf() throws Exception
|
public void testNotSortedNoIndexOf() throws Exception
|
||||||
{
|
{
|
||||||
GenericIndexed.fromArray(new String[]{"a", "c", "b"}, GenericIndexed.stringStrategy).indexOf("a");
|
GenericIndexed.fromArray(new String[]{"a", "c", "b"}, GenericIndexed.STRING_STRATEGY).indexOf("a");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = UnsupportedOperationException.class)
|
@Test(expected = UnsupportedOperationException.class)
|
||||||
|
@ -44,7 +44,7 @@ public class GenericIndexedTest
|
||||||
{
|
{
|
||||||
serializeAndDeserialize(
|
serializeAndDeserialize(
|
||||||
GenericIndexed.fromArray(
|
GenericIndexed.fromArray(
|
||||||
new String[]{"a", "c", "b"}, GenericIndexed.stringStrategy
|
new String[]{"a", "c", "b"}, GenericIndexed.STRING_STRATEGY
|
||||||
)
|
)
|
||||||
).indexOf("a");
|
).indexOf("a");
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ public class GenericIndexedTest
|
||||||
public void testSanity() throws Exception
|
public void testSanity() throws Exception
|
||||||
{
|
{
|
||||||
final String[] strings = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"};
|
final String[] strings = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"};
|
||||||
Indexed<String> indexed = GenericIndexed.fromArray(strings, GenericIndexed.stringStrategy);
|
Indexed<String> indexed = GenericIndexed.fromArray(strings, GenericIndexed.STRING_STRATEGY);
|
||||||
|
|
||||||
Assert.assertEquals(strings.length, indexed.size());
|
Assert.assertEquals(strings.length, indexed.size());
|
||||||
for (int i = 0; i < strings.length; i++) {
|
for (int i = 0; i < strings.length; i++) {
|
||||||
|
@ -81,7 +81,7 @@ public class GenericIndexedTest
|
||||||
|
|
||||||
GenericIndexed<String> deserialized = serializeAndDeserialize(
|
GenericIndexed<String> deserialized = serializeAndDeserialize(
|
||||||
GenericIndexed.fromArray(
|
GenericIndexed.fromArray(
|
||||||
strings, GenericIndexed.stringStrategy
|
strings, GenericIndexed.STRING_STRATEGY
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ public class GenericIndexedTest
|
||||||
final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
|
final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
|
||||||
Assert.assertEquals(indexed.getSerializedSize(), byteBuffer.remaining());
|
Assert.assertEquals(indexed.getSerializedSize(), byteBuffer.remaining());
|
||||||
GenericIndexed<String> deserialized = GenericIndexed.read(
|
GenericIndexed<String> deserialized = GenericIndexed.read(
|
||||||
byteBuffer, GenericIndexed.stringStrategy
|
byteBuffer, GenericIndexed.STRING_STRATEGY
|
||||||
);
|
);
|
||||||
Assert.assertEquals(0, byteBuffer.remaining());
|
Assert.assertEquals(0, byteBuffer.remaining());
|
||||||
return deserialized;
|
return deserialized;
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment.filter;
|
||||||
|
|
||||||
|
import io.druid.segment.Rowboat;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class RowboatTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void testRowboatCompare()
|
||||||
|
{
|
||||||
|
Rowboat rb1 = new Rowboat(12345L, new int[][]{new int[]{1}, new int[]{2}}, new Object[]{new Integer(7)}, 5);
|
||||||
|
Rowboat rb2 = new Rowboat(12345L, new int[][]{new int[]{1}, new int[]{2}}, new Object[]{new Integer(7)}, 5);
|
||||||
|
Assert.assertEquals(0, rb1.compareTo(rb2));
|
||||||
|
|
||||||
|
Rowboat rb3 = new Rowboat(12345L, new int[][]{new int[]{3}, new int[]{2}}, new Object[]{new Integer(7)}, 5);
|
||||||
|
Assert.assertNotEquals(0, rb1.compareTo(rb3));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBiggerCompare()
|
||||||
|
{
|
||||||
|
Rowboat rb1 = new Rowboat(
|
||||||
|
0,
|
||||||
|
new int[][]{
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{138},
|
||||||
|
new int[]{44},
|
||||||
|
new int[]{374},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{552},
|
||||||
|
new int[]{338},
|
||||||
|
new int[]{910},
|
||||||
|
new int[]{25570},
|
||||||
|
new int[]{9},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{0}
|
||||||
|
},
|
||||||
|
new Object[]{1.0, 47.0, "someMetric"},
|
||||||
|
0
|
||||||
|
);
|
||||||
|
|
||||||
|
Rowboat rb2 = new Rowboat(
|
||||||
|
0,
|
||||||
|
new int[][]{
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{138},
|
||||||
|
new int[]{44},
|
||||||
|
new int[]{374},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{553},
|
||||||
|
new int[]{338},
|
||||||
|
new int[]{910},
|
||||||
|
new int[]{25580},
|
||||||
|
new int[]{9},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{0},
|
||||||
|
new int[]{0}
|
||||||
|
},
|
||||||
|
new Object[]{1.0, 47.0, "someMetric"},
|
||||||
|
0
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertNotEquals(0, rb1.compareTo(rb2));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToString()
|
||||||
|
{
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Rowboat{timestamp=1970-01-01T00:00:00.000Z, dims=[[1], [2]], metrics=[someMetric], comprisedRows={}}",
|
||||||
|
new Rowboat(0, new int[][]{new int[]{1}, new int[]{2}}, new Object[]{"someMetric"}, 5).toString()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLotsONullString()
|
||||||
|
{
|
||||||
|
Assert.assertEquals(
|
||||||
|
"Rowboat{timestamp=1970-01-01T00:00:00.000Z, dims=null, metrics=null, comprisedRows={}}",
|
||||||
|
new Rowboat(0, null, null, 5).toString()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue