address code review

This commit is contained in:
fjy 2014-11-11 14:05:37 -08:00
parent e6b7b03b5b
commit 1cc162727b
11 changed files with 97 additions and 53 deletions

View File

@ -21,6 +21,7 @@ package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import io.druid.collections.IntList;
import java.io.IOException;
@ -262,4 +263,9 @@ public class SerializerUtils
return retVal;
}
public int getSerializedStringByteSize(String str)
{
return Ints.BYTES + str.getBytes().length;
}
}

View File

@ -27,7 +27,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
@ -37,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Map;
@ -106,7 +105,7 @@ public class AppendTask extends MergeTaskBase
);
}
return IndexMaker.append(adapters, outDir);
return IndexMerger.append(adapters, outDir);
}
@Override

View File

@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.timeline.DataSegment;
@ -60,7 +60,7 @@ public class MergeTask extends MergeTaskBase
public File merge(final Map<DataSegment, File> segments, final File outDir)
throws Exception
{
return IndexMaker.mergeQueryableIndex(
return IndexMerger.mergeQueryableIndex(
Lists.transform(
ImmutableList.copyOf(segments.values()),
new Function<File, QueryableIndex>()

View File

@ -90,7 +90,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
<version>0.1.0-SNAPSHOT</version>
<version>0.1.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -36,6 +36,7 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
@ -61,6 +62,7 @@ import io.druid.segment.data.ArrayIndexed;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedRTree;
@ -203,7 +205,7 @@ public class IndexIO
case 6:
case 7:
log.info("Old version, re-persisting.");
IndexMaker.append(
IndexMerger.append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))),
converted
);
@ -255,12 +257,7 @@ public class IndexIO
indexBuffer, GenericIndexed.stringStrategy
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory segmentBitmapSerdeFactory;
if (indexBuffer.hasRemaining()) {
segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class);
} else {
segmentBitmapSerdeFactory = BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY;
}
final BitmapSerdeFactory conciseBitmapSerdeFactory = new ConciseBitmapSerdeFactory();
CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()), BYTE_ORDER
@ -299,7 +296,7 @@ public class IndexIO
for (int i = 0; i < availableDimensions.size(); ++i) {
bitmaps.put(
serializerUtils.readString(invertedBuffer),
GenericIndexed.read(invertedBuffer, segmentBitmapSerdeFactory.getObjectStrategy())
GenericIndexed.read(invertedBuffer, conciseBitmapSerdeFactory.getObjectStrategy())
);
}
@ -310,7 +307,7 @@ public class IndexIO
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(
spatialBuffer,
new IndexedRTree.ImmutableRTreeObjectStrategy(segmentBitmapSerdeFactory.getBitmapFactory())
new IndexedRTree.ImmutableRTreeObjectStrategy(conciseBitmapSerdeFactory.getBitmapFactory())
)
);
}
@ -614,12 +611,10 @@ public class IndexIO
indexBuffer, GenericIndexed.stringStrategy
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory segmentBitmapSerdeFactory;
if (indexBuffer.hasRemaining()) {
segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class);
} else {
segmentBitmapSerdeFactory = BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY;
}
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue(
serializerUtils.readString(indexBuffer),
BitmapSerdeFactory.class
);
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims9));
@ -630,9 +625,7 @@ public class IndexIO
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16
// size of segmentBitmapSerdeFactory
+ 4
+ segmentBitmapSerdeFactoryString.getBytes().length;
+ serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
@ -678,7 +671,7 @@ public class IndexIO
)
.setBitmapIndex(
new BitmapIndexColumnPartSupplier(
BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY.getBitmapFactory(),
new ConciseBitmapFactory(),
index.getBitmapIndexes().get(dimension),
index.getDimValueLookup(dimension)
)
@ -740,7 +733,7 @@ public class IndexIO
index.getDataInterval(),
new ArrayIndexed<>(cols, String.class),
index.getAvailableDimensions(),
BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY.getBitmapFactory(),
new ConciseBitmapFactory(),
columns,
index.getFileMapper()
);
@ -767,6 +760,11 @@ public class IndexIO
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy);
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory;
/**
* This is a workaround for the fact that in v8 segments, we have no information about the type of bitmap
* index to use. Since we cannot very cleanly build v9 segments directly, we are using a workaround where
* this information is appended to the end of index.drd.
*/
if (indexBuffer.hasRemaining()) {
segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class);
} else {

View File

@ -1376,9 +1376,8 @@ public class IndexMaker
final long numBytes = cols.getSerializedSize()
+ dims.getSerializedSize()
+ 16
// Size of bitmap serde factory
+ 4
+ bitmapSerdeFactoryType.getBytes().length;
+ serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);

View File

@ -59,7 +59,7 @@ public class EmptyIndexTest
emptyIndex,
new ConciseBitmapFactory()
);
IndexMaker.merge(
IndexMerger.merge(
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
new AggregatorFactory[0],
tmpDir

View File

@ -39,8 +39,12 @@ import java.util.Arrays;
/**
*/
public class IndexMakerTest
public class IndexMergerTest
{
static {
}
@Test
public void testPersistCaseInsensitive() throws Exception
{
@ -50,7 +54,7 @@ public class IndexMakerTest
final File tempDir = Files.createTempDir();
try {
QueryableIndex index = IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir));
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir));
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -89,20 +93,20 @@ public class IndexMakerTest
final File tempDir2 = Files.createTempDir();
final File mergedDir = Files.createTempDir();
try {
QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1));
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1));
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(2, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2));
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(2, index2.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(index1, index2),
new AggregatorFactory[]{},
mergedDir
@ -146,10 +150,10 @@ public class IndexMakerTest
)
);
final QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tmpDir2));
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2));
final QueryableIndex merged = IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
);
Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());

View File

@ -179,11 +179,11 @@ public class SchemalessIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMaker.persist(top, topFile);
IndexMaker.persist(bottom, bottomFile);
IndexMerger.persist(top, topFile);
IndexMerger.persist(bottom, bottomFile);
mergedIndex = io.druid.segment.IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile
)
);
@ -225,7 +225,7 @@ public class SchemalessIndex
mergedFile.deleteOnExit();
QueryableIndex index = IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile
)
);
@ -262,7 +262,7 @@ public class SchemalessIndex
}
QueryableIndex index = IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile)
IndexMerger.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile)
);
return index;
@ -343,7 +343,7 @@ public class SchemalessIndex
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMaker.persist(rowIndex, tmpFile);
IndexMerger.persist(rowIndex, tmpFile);
rowPersistedIndexes.add(IndexIO.loadIndex(tmpFile));
}
}
@ -403,7 +403,7 @@ public class SchemalessIndex
theFile.mkdirs();
theFile.deleteOnExit();
filesToMap.add(theFile);
IndexMaker.persist(index, theFile);
IndexMerger.persist(index, theFile);
}
return filesToMap;
@ -463,7 +463,7 @@ public class SchemalessIndex
);
}
return IndexIO.loadIndex(IndexMaker.append(adapters, mergedFile));
return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile));
}
catch (IOException e) {
throw Throwables.propagate(e);
@ -482,7 +482,7 @@ public class SchemalessIndex
List<File> filesToMap = makeFilesToMap(tmpFile, files);
return IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(
IndexMerger.mergeQueryableIndex(
Lists.newArrayList(
Iterables.transform(
filesToMap,

View File

@ -1169,6 +1169,7 @@ public class SchemalessTestFull
)
);
/* Uncomment when Druid support for nulls/empty strings is actually consistent
List<Result<TopNResultValue>> expectedTopNResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
@ -1205,6 +1206,43 @@ public class SchemalessTestFull
)
)
);
*/
List<Result<TopNResultValue>> expectedTopNResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("rows", 4L)
.put("index", 400.0D)
.put("addRowsIndexConstant", 405.0D)
.put("uniques", 0.0D)
.put("maxIndex", 100.0)
.put("minIndex", 100.0)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "")
.put("rows", 3L)
.put("index", 200.0D)
.put("addRowsIndexConstant", 204.0D)
.put("uniques", 0.0)
.put("maxIndex", 100.0)
.put("minIndex", 0.0)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("rows", 2L)
.put("index", 200.0D)
.put("addRowsIndexConstant", 203.0D)
.put("uniques", UNIQUES_1)
.put("maxIndex", 100.0)
.put("minIndex", 100.0)
.build()
)
)
)
);
List<Result<TopNResultValue>> expectedFilteredTopNResults = Arrays.asList(
new Result<TopNResultValue>(

View File

@ -46,7 +46,7 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -232,7 +232,7 @@ public class SpatialFilterBonusTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMaker.persist(theIndex, tmpFile);
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
@ -412,12 +412,12 @@ public class SpatialFilterBonusTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMaker.persist(first, DATA_INTERVAL, firstFile);
IndexMaker.persist(second, DATA_INTERVAL, secondFile);
IndexMaker.persist(third, DATA_INTERVAL, thirdFile);
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMaker.mergeQueryableIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile