From 1cc162727ba4f5e0d972510dc9d1c8cb1532d445 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 11 Nov 2014 14:05:37 -0800 Subject: [PATCH] address code review --- .../druid/common/utils/SerializerUtils.java | 6 +++ .../indexing/common/task/AppendTask.java | 5 +-- .../druid/indexing/common/task/MergeTask.java | 4 +- pom.xml | 2 +- .../main/java/io/druid/segment/IndexIO.java | 38 +++++++++---------- .../java/io/druid/segment/IndexMaker.java | 5 +-- .../java/io/druid/segment/EmptyIndexTest.java | 2 +- ...dexMakerTest.java => IndexMergerTest.java} | 20 ++++++---- .../io/druid/segment/SchemalessIndex.java | 18 ++++----- .../io/druid/segment/SchemalessTestFull.java | 38 +++++++++++++++++++ .../filter/SpatialFilterBonusTest.java | 12 +++--- 11 files changed, 97 insertions(+), 53 deletions(-) rename processing/src/test/java/io/druid/segment/{IndexMakerTest.java => IndexMergerTest.java} (89%) diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java index 1f6edb43f4e..f5c077e539d 100644 --- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java +++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java @@ -21,6 +21,7 @@ package io.druid.common.utils; import com.google.common.io.ByteStreams; import com.google.common.io.OutputSupplier; +import com.google.common.primitives.Ints; import io.druid.collections.IntList; import java.io.IOException; @@ -262,4 +263,9 @@ public class SerializerUtils return retVal; } + + public int getSerializedStringByteSize(String str) + { + return Ints.BYTES + str.getBytes().length; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java index 5b6bd37db16..15f89e8cfbd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java @@ -27,7 +27,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexableAdapter; import io.druid.segment.QueryableIndexIndexableAdapter; import io.druid.segment.Rowboat; @@ -37,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.File; import java.util.List; import java.util.Map; @@ -106,7 +105,7 @@ public class AppendTask extends MergeTaskBase ); } - return IndexMaker.append(adapters, outDir); + return IndexMerger.append(adapters, outDir); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java index d856d1505bc..9bca4a3eee5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java @@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; +import io.druid.segment.IndexMerger; import io.druid.segment.QueryableIndex; import io.druid.timeline.DataSegment; @@ -60,7 +60,7 @@ public class MergeTask extends MergeTaskBase public File merge(final Map segments, final File outDir) throws Exception { - return IndexMaker.mergeQueryableIndex( + return IndexMerger.mergeQueryableIndex( Lists.transform( ImmutableList.copyOf(segments.values()), new Function() diff --git a/pom.xml b/pom.xml index d335a3a03c2..c2aaf01b1c8 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ com.metamx bytebuffer-collections - 0.1.0-SNAPSHOT + 0.1.1 com.metamx diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 47c39f8847a..815c358bc46 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -36,6 +36,7 @@ import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Module; import com.metamx.collections.bitmap.BitmapFactory; +import com.metamx.collections.bitmap.ConciseBitmapFactory; import com.metamx.collections.bitmap.ImmutableBitmap; import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.collections.spatial.ImmutableRTree; @@ -61,6 +62,7 @@ import io.druid.segment.data.ArrayIndexed; import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.ByteBufferSerializer; import io.druid.segment.data.CompressedLongsIndexedSupplier; +import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.GenericIndexed; import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.IndexedRTree; @@ -203,7 +205,7 @@ public class IndexIO case 6: case 7: log.info("Old version, re-persisting."); - IndexMaker.append( + IndexMerger.append( Arrays.asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))), converted ); @@ -255,12 +257,7 @@ public class IndexIO indexBuffer, GenericIndexed.stringStrategy ); final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); - final BitmapSerdeFactory segmentBitmapSerdeFactory; - if (indexBuffer.hasRemaining()) { - segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class); - } else { - segmentBitmapSerdeFactory = BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY; - } + final BitmapSerdeFactory conciseBitmapSerdeFactory = new ConciseBitmapSerdeFactory(); CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()), BYTE_ORDER @@ -299,7 +296,7 @@ public class IndexIO for (int i = 0; i < availableDimensions.size(); ++i) { bitmaps.put( serializerUtils.readString(invertedBuffer), - GenericIndexed.read(invertedBuffer, segmentBitmapSerdeFactory.getObjectStrategy()) + GenericIndexed.read(invertedBuffer, conciseBitmapSerdeFactory.getObjectStrategy()) ); } @@ -310,7 +307,7 @@ public class IndexIO serializerUtils.readString(spatialBuffer), ByteBufferSerializer.read( spatialBuffer, - new IndexedRTree.ImmutableRTreeObjectStrategy(segmentBitmapSerdeFactory.getBitmapFactory()) + new IndexedRTree.ImmutableRTreeObjectStrategy(conciseBitmapSerdeFactory.getBitmapFactory()) ) ); } @@ -614,12 +611,10 @@ public class IndexIO indexBuffer, GenericIndexed.stringStrategy ); final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); - final BitmapSerdeFactory segmentBitmapSerdeFactory; - if (indexBuffer.hasRemaining()) { - segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class); - } else { - segmentBitmapSerdeFactory = BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY; - } + final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( + serializerUtils.readString(indexBuffer), + BitmapSerdeFactory.class + ); Set columns = Sets.newTreeSet(); columns.addAll(Lists.newArrayList(dims9)); @@ -630,9 +625,7 @@ public class IndexIO final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 - // size of segmentBitmapSerdeFactory - + 4 - + segmentBitmapSerdeFactoryString.getBytes().length; + + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); @@ -678,7 +671,7 @@ public class IndexIO ) .setBitmapIndex( new BitmapIndexColumnPartSupplier( - BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY.getBitmapFactory(), + new ConciseBitmapFactory(), index.getBitmapIndexes().get(dimension), index.getDimValueLookup(dimension) ) @@ -740,7 +733,7 @@ public class IndexIO index.getDataInterval(), new ArrayIndexed<>(cols, String.class), index.getAvailableDimensions(), - BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY.getBitmapFactory(), + new ConciseBitmapFactory(), columns, index.getFileMapper() ); @@ -767,6 +760,11 @@ public class IndexIO final GenericIndexed dims = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy); final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong()); final BitmapSerdeFactory segmentBitmapSerdeFactory; + /** + * This is a workaround for the fact that in v8 segments, we have no information about the type of bitmap + * index to use. Since we cannot very cleanly build v9 segments directly, we are using a workaround where + * this information is appended to the end of index.drd. + */ if (indexBuffer.hasRemaining()) { segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class); } else { diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java index e3051a5672f..8b304e77605 100644 --- a/processing/src/main/java/io/druid/segment/IndexMaker.java +++ b/processing/src/main/java/io/druid/segment/IndexMaker.java @@ -1376,9 +1376,8 @@ public class IndexMaker final long numBytes = cols.getSerializedSize() + dims.getSerializedSize() + 16 - // Size of bitmap serde factory - + 4 - + bitmapSerdeFactoryType.getBytes().length; + + serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType); + final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index dc59edab460..ae781ca626c 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -59,7 +59,7 @@ public class EmptyIndexTest emptyIndex, new ConciseBitmapFactory() ); - IndexMaker.merge( + IndexMerger.merge( Lists.newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir diff --git a/processing/src/test/java/io/druid/segment/IndexMakerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java similarity index 89% rename from processing/src/test/java/io/druid/segment/IndexMakerTest.java rename to processing/src/test/java/io/druid/segment/IndexMergerTest.java index 4b39d515a6b..b940bf21ecf 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -39,8 +39,12 @@ import java.util.Arrays; /** */ -public class IndexMakerTest +public class IndexMergerTest { + static { + + } + @Test public void testPersistCaseInsensitive() throws Exception { @@ -50,7 +54,7 @@ public class IndexMakerTest final File tempDir = Files.createTempDir(); try { - QueryableIndex index = IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir)); + QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir)); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); @@ -89,20 +93,20 @@ public class IndexMakerTest final File tempDir2 = Files.createTempDir(); final File mergedDir = Files.createTempDir(); try { - QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1)); + QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1)); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(2, index1.getColumnNames().size()); - QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2)); + QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2)); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); Assert.assertEquals(2, index2.getColumnNames().size()); QueryableIndex merged = IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex( + IndexMerger.mergeQueryableIndex( Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir @@ -146,10 +150,10 @@ public class IndexMakerTest ) ); - final QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tmpDir1)); - final QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tmpDir2)); + final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1)); + final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2)); final QueryableIndex merged = IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3) + IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3) ); Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index 823fd83774a..84283c8082e 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -179,11 +179,11 @@ public class SchemalessIndex mergedFile.mkdirs(); mergedFile.deleteOnExit(); - IndexMaker.persist(top, topFile); - IndexMaker.persist(bottom, bottomFile); + IndexMerger.persist(top, topFile); + IndexMerger.persist(bottom, bottomFile); mergedIndex = io.druid.segment.IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex( + IndexMerger.mergeQueryableIndex( Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile ) ); @@ -225,7 +225,7 @@ public class SchemalessIndex mergedFile.deleteOnExit(); QueryableIndex index = IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex( + IndexMerger.mergeQueryableIndex( Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile ) ); @@ -262,7 +262,7 @@ public class SchemalessIndex } QueryableIndex index = IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile) + IndexMerger.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile) ); return index; @@ -343,7 +343,7 @@ public class SchemalessIndex tmpFile.mkdirs(); tmpFile.deleteOnExit(); - IndexMaker.persist(rowIndex, tmpFile); + IndexMerger.persist(rowIndex, tmpFile); rowPersistedIndexes.add(IndexIO.loadIndex(tmpFile)); } } @@ -403,7 +403,7 @@ public class SchemalessIndex theFile.mkdirs(); theFile.deleteOnExit(); filesToMap.add(theFile); - IndexMaker.persist(index, theFile); + IndexMerger.persist(index, theFile); } return filesToMap; @@ -463,7 +463,7 @@ public class SchemalessIndex ); } - return IndexIO.loadIndex(IndexMaker.append(adapters, mergedFile)); + return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile)); } catch (IOException e) { throw Throwables.propagate(e); @@ -482,7 +482,7 @@ public class SchemalessIndex List filesToMap = makeFilesToMap(tmpFile, files); return IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex( + IndexMerger.mergeQueryableIndex( Lists.newArrayList( Iterables.transform( filesToMap, diff --git a/processing/src/test/java/io/druid/segment/SchemalessTestFull.java b/processing/src/test/java/io/druid/segment/SchemalessTestFull.java index 29548d8b55e..23dac58ee4c 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessTestFull.java +++ b/processing/src/test/java/io/druid/segment/SchemalessTestFull.java @@ -1169,6 +1169,7 @@ public class SchemalessTestFull ) ); + /* Uncomment when Druid support for nulls/empty strings is actually consistent List> expectedTopNResults = Arrays.asList( new Result( new DateTime("2011-01-12T00:00:00.000Z"), @@ -1205,6 +1206,43 @@ public class SchemalessTestFull ) ) ); + */ + List> expectedTopNResults = Arrays.asList( + new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue( + Arrays.>asList( + ImmutableMap.builder() + .put("market", "spot") + .put("rows", 4L) + .put("index", 400.0D) + .put("addRowsIndexConstant", 405.0D) + .put("uniques", 0.0D) + .put("maxIndex", 100.0) + .put("minIndex", 100.0) + .build(), + ImmutableMap.builder() + .put("market", "") + .put("rows", 3L) + .put("index", 200.0D) + .put("addRowsIndexConstant", 204.0D) + .put("uniques", 0.0) + .put("maxIndex", 100.0) + .put("minIndex", 0.0) + .build(), + ImmutableMap.builder() + .put("market", "total_market") + .put("rows", 2L) + .put("index", 200.0D) + .put("addRowsIndexConstant", 203.0D) + .put("uniques", UNIQUES_1) + .put("maxIndex", 100.0) + .put("minIndex", 100.0) + .build() + ) + ) + ) + ); List> expectedFilteredTopNResults = Arrays.asList( new Result( diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 4e7ae7d3163..b4c94e18dfb 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -46,7 +46,7 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; +import io.druid.segment.IndexMerger; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; @@ -232,7 +232,7 @@ public class SpatialFilterBonusTest tmpFile.mkdirs(); tmpFile.deleteOnExit(); - IndexMaker.persist(theIndex, tmpFile); + IndexMerger.persist(theIndex, tmpFile); return IndexIO.loadIndex(tmpFile); } @@ -412,12 +412,12 @@ public class SpatialFilterBonusTest mergedFile.mkdirs(); mergedFile.deleteOnExit(); - IndexMaker.persist(first, DATA_INTERVAL, firstFile); - IndexMaker.persist(second, DATA_INTERVAL, secondFile); - IndexMaker.persist(third, DATA_INTERVAL, thirdFile); + IndexMerger.persist(first, DATA_INTERVAL, firstFile); + IndexMerger.persist(second, DATA_INTERVAL, secondFile); + IndexMerger.persist(third, DATA_INTERVAL, thirdFile); QueryableIndex mergedRealtime = IndexIO.loadIndex( - IndexMaker.mergeQueryableIndex( + IndexMerger.mergeQueryableIndex( Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)), METRIC_AGGS, mergedFile