Merge branch 'master' into yops-descriptor-dimensions

This commit is contained in:
Gian Merlino 2013-01-17 16:02:32 -08:00
commit 5ce53eb2ac
39 changed files with 627 additions and 211 deletions

View File

@ -2,19 +2,11 @@
"queryType": "groupBy",
"dataSource": "twitterstream",
"granularity": "all",
"dimensions": ["lang"],
"dimensions": ["lang", "utc_offset"],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"},
{ "type": "max", "fieldName": "max_statuses_count", "name": "theMaxStatusesCount"},
{ "type": "max", "fieldName": "max_retweet_count", "name": "theMaxRetweetCount"},
{ "type": "max", "fieldName": "max_friends_count", "name": "theMaxFriendsCount"},
{ "type": "max", "fieldName": "max_follower_count", "name": "theMaxFollowerCount"},
{ "type": "doubleSum", "fieldName": "total_statuses_count", "name": "total_tweets_all_time"}
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
],
"filter": { "type": "selector", "dimension": "lang", "value": "en" },
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -1,25 +1,34 @@
package druid.examples.twitter;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonTypeName;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import twitter4j.*;
import org.codehaus.jackson.annotate.JsonTypeName;
import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.*;
import static java.lang.Thread.sleep;
/**
@ -241,30 +250,26 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
} catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e);
}
//log.info("twitterStatus: "+ status.getCreatedAt() + " @" + status.getUser().getScreenName() + " - " + status.getText());//DEBUG
// theMap.put("twid", status.getUser().getScreenName());
// theMap.put("msg", status.getText()); // ToDo: verify encoding
HashtagEntity[] hts = status.getHashtagEntities();
if (hts != null && hts.length > 0) {
// ToDo: get all the hash tags instead of just the first one
theMap.put("htags", hts[0].getText());
} else {
theMap.put("htags", null);
List<String> hashTags = Lists.newArrayListWithExpectedSize(hts.length);
for (HashtagEntity ht : hts) {
hashTags.add(ht.getText());
}
theMap.put("htags", Arrays.asList(hashTags.get(0)));
}
long retweetCount = status.getRetweetCount();
theMap.put("retweet_count", retweetCount);
User u = status.getUser();
if (u != null) {
theMap.put("follower_count", u.getFollowersCount());
theMap.put("friends_count", u.getFriendsCount());
theMap.put("lang", u.getLang());
theMap.put("utc_offset", u.getUtcOffset()); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", u.getStatusesCount());
} else {
log.error("status.getUser() is null");
User user = status.getUser();
if (user != null) {
theMap.put("follower_count", user.getFollowersCount());
theMap.put("friends_count", user.getFriendsCount());
theMap.put("lang", user.getLang());
theMap.put("utc_offset", user.getUtcOffset()); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", user.getStatusesCount());
}
return new MapBasedInputRow(status.getCreatedAt().getTime(), dimensions, theMap);

View File

@ -31,8 +31,8 @@
"firehose": {
"type": "twitzer",
"maxEventCount": 50000,
"maxRunMinutes": 10
"maxEventCount": 500000,
"maxRunMinutes": 120
},
"plumber": {

View File

@ -28,6 +28,7 @@ import org.joda.time.Interval;
public interface QueryableIndex extends ColumnSelector
{
public Interval getDataInterval();
public int getNumRows();
public Indexed<String> getColumnNames();
public Indexed<String> getAvailableDimensions();
}

View File

@ -56,6 +56,12 @@ public class SimpleQueryableIndex implements QueryableIndex
return dataInterval;
}
@Override
public int getNumRows()
{
return timeColumn.getLength();
}
@Override
public Indexed<String> getColumnNames()
{

View File

@ -25,6 +25,7 @@ public interface Column
{
public ColumnCapabilities getCapabilities();
public int getLength();
public DictionaryEncodedColumn getDictionaryEncoding();
public RunLengthColumn getRunLengthColumn();
public GenericColumn getGenericColumn();

View File

@ -26,5 +26,6 @@ import java.io.Closeable;
public interface ComplexColumn extends Closeable
{
public Class<?> getClazz();
public String getTypeName();
public Object getRowValue(int rowNum);
}

View File

@ -29,10 +29,12 @@ public class ComplexColumnImpl extends AbstractColumn
.setType(ValueType.COMPLEX);
private final Indexed column;
private final String typeName;
public ComplexColumnImpl(Indexed column)
public ComplexColumnImpl(String typeName, Indexed column)
{
this.column = column;
this.typeName = typeName;
}
@Override
@ -41,9 +43,15 @@ public class ComplexColumnImpl extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public ComplexColumn getComplexColumn()
{
return new IndexedComplexColumn(column);
return new IndexedComplexColumn(typeName, column);
}
}

View File

@ -26,6 +26,7 @@ import com.metamx.druid.kv.IndexedInts;
public interface DictionaryEncodedColumn
{
public int size();
public boolean hasMultipleValues();
public int getSingleValueRow(int rowNum);
public IndexedInts getMultiValueRow(int rowNum);
public String lookupName(int id);

View File

@ -19,8 +19,7 @@
package com.metamx.druid.index.column;
import com.google.common.base.Supplier;
import com.metamx.druid.kv.IndexedFloats;
import com.metamx.druid.index.v1.CompressedFloatsIndexedSupplier;
/**
*/
@ -29,9 +28,9 @@ public class FloatColumn extends AbstractColumn
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.FLOAT);
private final Supplier<IndexedFloats> column;
private final CompressedFloatsIndexedSupplier column;
public FloatColumn(Supplier<IndexedFloats> column)
public FloatColumn(CompressedFloatsIndexedSupplier column)
{
this.column = column;
}
@ -42,6 +41,12 @@ public class FloatColumn extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public GenericColumn getGenericColumn()
{

View File

@ -30,6 +30,8 @@ import java.io.Closeable;
public interface GenericColumn extends Closeable
{
public int size();
public ValueType getType();
public boolean hasMultipleValues();
public String getStringSingleValueRow(int rowNum);
public Indexed<String> getStringMultiValueRow(int rowNum);

View File

@ -28,12 +28,14 @@ import java.io.IOException;
public class IndexedComplexColumn implements ComplexColumn
{
private final Indexed column;
private final String typeName;
public IndexedComplexColumn(
Indexed column
String typeName, Indexed column
)
{
this.column = column;
this.typeName = typeName;
}
@Override
public Class<?> getClazz()
@ -41,6 +43,12 @@ public class IndexedComplexColumn implements ComplexColumn
return column.getClazz();
}
@Override
public String getTypeName()
{
return typeName;
}
@Override
public Object getRowValue(int rowNum)
{

View File

@ -43,6 +43,18 @@ public class IndexedFloatsGenericColumn implements GenericColumn
return column.size();
}
@Override
public ValueType getType()
{
return ValueType.FLOAT;
}
@Override
public boolean hasMultipleValues()
{
return false;
}
@Override
public String getStringSingleValueRow(int rowNum)
{

View File

@ -43,6 +43,18 @@ public class IndexedLongsGenericColumn implements GenericColumn
return column.size();
}
@Override
public ValueType getType()
{
return ValueType.LONG;
}
@Override
public boolean hasMultipleValues()
{
return false;
}
@Override
public String getStringSingleValueRow(int rowNum)
{

View File

@ -19,8 +19,7 @@
package com.metamx.druid.index.column;
import com.google.common.base.Supplier;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.index.v1.CompressedLongsIndexedSupplier;
/**
*/
@ -29,9 +28,9 @@ public class LongColumn extends AbstractColumn
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.LONG);
private final Supplier<IndexedLongs> column;
private final CompressedLongsIndexedSupplier column;
public LongColumn(Supplier<IndexedLongs> column)
public LongColumn(CompressedLongsIndexedSupplier column)
{
this.column = column;
}
@ -42,6 +41,12 @@ public class LongColumn extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public GenericColumn getGenericColumn()
{

View File

@ -20,6 +20,7 @@
package com.metamx.druid.index.column;
import com.google.common.base.Supplier;
import com.google.common.io.Closeables;
/**
*/
@ -55,6 +56,19 @@ class SimpleColumn implements Column
return capabilities;
}
@Override
public int getLength()
{
GenericColumn column = null;
try {
column = genericColumn.get();
return column.size();
}
finally {
Closeables.closeQuietly(column);
}
}
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{

View File

@ -46,7 +46,13 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
@Override
public int size()
{
return column == null ? multiValueColumn.size() : column.size();
return hasMultipleValues() ? multiValueColumn.size() : column.size();
}
@Override
public boolean hasMultipleValues()
{
return column == null;
}
@Override

View File

@ -55,6 +55,12 @@ public class StringMultiValueColumn extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{
@ -66,6 +72,12 @@ public class StringMultiValueColumn extends AbstractColumn
return column.size();
}
@Override
public boolean hasMultipleValues()
{
return true;
}
@Override
public int getSingleValueRow(int rowNum)
{

View File

@ -51,7 +51,12 @@ public class BitmapIndexColumnPartSupplier implements Supplier<BitmapIndex>
{
final int index = dictionary.indexOf(value);
return index >= 0 ? bitmaps.get(index) : EMPTY_SET;
if (index < 0) {
return EMPTY_SET;
}
final ImmutableConciseSet bitmap = bitmaps.get(index);
return bitmap == null ? EMPTY_SET : bitmap;
}
};
}

View File

@ -36,25 +36,28 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
{
@JsonCreator
public static ComplexColumnPartSerde createDeserializer(
@JsonProperty("complexType") String complexType
@JsonProperty("typeName") String complexType
)
{
return new ComplexColumnPartSerde(null, complexType);
}
private final GenericIndexed column;
private final String typeName;
private final ComplexMetricSerde serde;
public ComplexColumnPartSerde(GenericIndexed column, String complexType)
public ComplexColumnPartSerde(GenericIndexed column, String typeName)
{
this.column = column;
serde = ComplexMetrics.getSerdeForType(complexType);
this.typeName = typeName;
serde = ComplexMetrics.getSerdeForType(typeName);
}
@JsonProperty
public GenericIndexed getColumn()
public String getTypeName()
{
return column;
return typeName;
}
@Override

View File

@ -32,8 +32,7 @@ public class ComplexColumnPartSupplier implements Supplier<ComplexColumn>
private final String typeName;
public ComplexColumnPartSupplier(
final GenericIndexed complexType,
final String typeName
final String typeName, final GenericIndexed complexType
) {
this.complexType = complexType;
this.typeName = typeName;
@ -42,6 +41,6 @@ public class ComplexColumnPartSupplier implements Supplier<ComplexColumn>
@Override
public ComplexColumn get()
{
return new IndexedComplexColumn(complexType);
return new IndexedComplexColumn(typeName, complexType);
}
}

View File

@ -59,6 +59,11 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
this.baseFloatBuffers = baseFloatBuffers;
}
public int size()
{
return totalSize;
}
@Override
public IndexedFloats get()
{

View File

@ -524,7 +524,7 @@ public class IncrementalIndex implements Iterable<Row>
public String get(String value)
{
return poorMansInterning.get(value);
return value == null ? null : poorMansInterning.get(value);
}
public int getId(String value)

View File

@ -21,6 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -58,6 +59,7 @@ import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import com.metamx.druid.utils.SerializerUtils;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Interval;
@ -70,6 +72,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -99,7 +102,7 @@ public class IndexIO
private static final Logger log = new Logger(IndexIO.class);
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
// This should really be provided by DI, should be changed once we switch around to using a DI framework
private static final ObjectMapper mapper = new DefaultObjectMapper();
@ -120,6 +123,7 @@ public class IndexIO
return handler.canBeMapped(inDir);
}
@Deprecated
public static MMappedIndex mapDir(final File inDir) throws IOException
{
init();
@ -332,7 +336,7 @@ public class IndexIO
throw new UnsupportedOperationException("Shouldn't ever happen in a cluster that is not owned by MMX.");
}
public void convertV8toV9(File v8Dir, File v9Dir) throws IOException
public static void convertV8toV9(File v8Dir, File v9Dir) throws IOException
{
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
@ -383,22 +387,70 @@ public class IndexIO
serializerUtils.writeString(nameBAOS, dimension);
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
final GenericIndexed<String> dictionary = GenericIndexed.read(
GenericIndexed<String> dictionary = GenericIndexed.read(
dimBuffer, GenericIndexed.stringStrategy
);
VSizeIndexedInts singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableConciseSet> bitmaps = bitmapIndexes.get(dimension);
boolean onlyOneValue = true;
for (VSizeIndexedInts rowValue : multiValCol) {
ConciseSet nullsSet = null;
for (int i = 0; i < multiValCol.size(); ++i) {
VSizeIndexedInts rowValue = multiValCol.get(i);
if (!onlyOneValue) {
break;
}
if (rowValue.size() > 1) {
onlyOneValue = false;
}
if (rowValue.size() == 0) {
if (nullsSet == null) {
nullsSet = new ConciseSet();
}
nullsSet.add(i);
}
}
if (onlyOneValue) {
log.info("Dimension[%s] is single value, converting...", dimension);
final boolean bumpedDictionary;
if (nullsSet != null) {
log.info("Dimension[%s] has null rows.", dimension);
final ImmutableConciseSet theNullSet = ImmutableConciseSet.newImmutableFromMutable(nullsSet);
if (dictionary.get(0) != null) {
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
bumpedDictionary = true;
final List<String> nullList = Lists.newArrayList();
nullList.add(null);
dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dictionary),
GenericIndexed.stringStrategy
);
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(Arrays.asList(theNullSet), bitmaps),
ConciseCompressedIndexedInts.objectStrategy
);
}
else {
bumpedDictionary = false;
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(
Arrays.asList(ImmutableConciseSet.union(theNullSet, bitmaps.get(0))),
Iterables.skip(bitmaps, 1)
),
ConciseCompressedIndexedInts.objectStrategy
);
}
}
else {
bumpedDictionary = false;
}
final VSizeIndexed finalMultiValCol = multiValCol;
singleValCol = VSizeIndexedInts.fromList(
new AbstractList<Integer>()
@ -406,7 +458,8 @@ public class IndexIO
@Override
public Integer get(int index)
{
return finalMultiValCol.get(index).get(0);
final VSizeIndexedInts ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}
@Override
@ -423,7 +476,7 @@ public class IndexIO
}
builder.addSerde(
new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmapIndexes.get(dimension))
new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmaps)
);
final ColumnDescriptor serdeficator = builder.build();
@ -587,7 +640,7 @@ public class IndexIO
.setType(ValueType.COMPLEX)
.setComplexColumn(
new ComplexColumnPartSupplier(
(GenericIndexed) metricHolder.complexType, metricHolder.getTypeName()
metricHolder.getTypeName(), (GenericIndexed) metricHolder.complexType
)
)
.build()

View File

@ -38,10 +38,10 @@ import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ParserUtils;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.input.MapBasedInputRow;
import org.apache.commons.io.FileUtils;
@ -359,7 +359,7 @@ public class IndexGeneratorJob implements Jobby
log.info("%,d lines completed.", lineCount);
List<MMappedIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
@ -389,9 +389,9 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(finalFile);
for (File file : toMerge) {
indexes.add(IndexIO.mapDir(file));
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = IndexMerger.mergeMMapped(
mergedBase = IndexMerger.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()
{
@Override

View File

@ -28,16 +28,16 @@ import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Plumber;
import com.metamx.druid.realtime.PlumberSchool;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.realtime.Sink;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@ -130,13 +130,13 @@ public class YeOldePlumberSchool implements PlumberSchool
} else if(spilled.size() == 1) {
fileToUpload = Iterables.getOnlyElement(spilled);
} else {
List<MMappedIndex> indexes = Lists.newArrayList();
List<QueryableIndex> indexes = Lists.newArrayList();
for (final File oneSpill : spilled) {
indexes.add(IndexIO.mapDir(oneSpill));
indexes.add(IndexIO.loadIndex(oneSpill));
}
fileToUpload = new File(tmpSegmentDir, "merged");
IndexMerger.mergeMMapped(indexes, schema.getAggregators(), fileToUpload);
IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
}
// Map merged segment so we can extract dimensions

View File

@ -30,7 +30,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.IndexableAdapter;
import com.metamx.druid.index.v1.MMappedIndexAdapter;
import com.metamx.druid.index.v1.QueryableIndexIndexableAdapter;
import com.metamx.druid.index.v1.Rowboat;
import com.metamx.druid.index.v1.RowboatFilteringIndexAdapter;
import org.codehaus.jackson.annotate.JsonCreator;
@ -90,8 +90,8 @@ public class AppendTask extends MergeTask
for (final SegmentToMergeHolder holder : segmentsToMerge) {
adapters.add(
new RowboatFilteringIndexAdapter(
new MMappedIndexAdapter(
IndexIO.mapDir(holder.getFile())
new QueryableIndexIndexableAdapter(
IndexIO.loadIndex(holder.getFile())
),
new Predicate<Rowboat>()
{

View File

@ -25,9 +25,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@ -57,16 +57,16 @@ public class DefaultMergeTask extends MergeTask
public File merge(final Map<DataSegment, File> segments, final File outDir)
throws Exception
{
return IndexMerger.mergeMMapped(
return IndexMerger.mergeQueryableIndex(
Lists.transform(
ImmutableList.copyOf(segments.values()),
new Function<File, MMappedIndex>()
new Function<File, QueryableIndex>()
{
@Override
public MMappedIndex apply(@Nullable File input)
public QueryableIndex apply(@Nullable File input)
{
try {
return IndexIO.mapDir(input);
return IndexIO.loadIndex(input);
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -68,7 +68,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<version>0.18.0</version>
<version>0.19.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -308,30 +308,21 @@ public class RealtimePlumberSchool implements PlumberSchool
final File mergedFile;
try {
List<MMappedIndex> indexes = Lists.newArrayList();
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
if (queryableIndex instanceof MMappedIndexQueryableIndex) {
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(((MMappedIndexQueryableIndex) queryableIndex).getIndex());
}
else {
log.makeAlert("[%s] Failure to merge-n-push", schema.getDataSource())
.addData("type", "Unknown segment type")
.addData("adapterClass", segment.getClass().toString())
.emit();
return;
}
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeMMapped(
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
new File(computePersistDir(schema, interval), "merged")
);
MMappedIndex index = IndexIO.mapDir(mergedFile);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = segmentPusher.push(
mergedFile,
@ -503,9 +494,7 @@ public class RealtimePlumberSchool implements PlumberSchool
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(
new QueryableIndexSegment(null, new MMappedIndexQueryableIndex(IndexIO.mapDir(persistedFile)))
);
indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile)));
return numRows;
}

View File

@ -19,7 +19,6 @@
package com.metamx.druid.index.v1;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.kv.FlattenedArrayWriter;
@ -27,7 +26,6 @@ import com.metamx.druid.kv.IOPeon;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
/**
*/
@ -75,18 +73,12 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
{
writer.close();
final File littleEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.LITTLE_ENDIAN);
littleEndianFile.delete();
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
outFile.delete();
MetricHolder.writeComplexMetric(
Files.newOutputStreamSupplier(littleEndianFile, true), metricName, serde.getTypeName(), writer
);
IndexIO.checkFileSize(littleEndianFile);
final File bigEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.BIG_ENDIAN);
ByteStreams.copy(
Files.newInputStreamSupplier(littleEndianFile),
Files.newOutputStreamSupplier(bigEndianFile, false)
Files.newOutputStreamSupplier(outFile, true), metricName, serde.getTypeName(), writer
);
IndexIO.checkFileSize(outFile);
writer = null;
}

View File

@ -24,7 +24,6 @@ import com.metamx.druid.kv.IOPeon;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
/**
*/
@ -34,8 +33,7 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
private final IOPeon ioPeon;
private final File outDir;
private CompressedFloatsSupplierSerializer littleMetricsWriter;
private CompressedFloatsSupplierSerializer bigEndianMetricsWriter;
private CompressedFloatsSupplierSerializer writer;
public FloatMetricColumnSerializer(
String metricName,
@ -51,43 +49,30 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
@Override
public void open() throws IOException
{
littleMetricsWriter = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), ByteOrder.LITTLE_ENDIAN
);
bigEndianMetricsWriter = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_big", metricName), ByteOrder.BIG_ENDIAN
writer = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER
);
littleMetricsWriter.open();
bigEndianMetricsWriter.open();
writer.open();
}
@Override
public void serialize(Object obj) throws IOException
{
float val = (obj == null) ? 0 : ((Number) obj).floatValue();
littleMetricsWriter.add(val);
bigEndianMetricsWriter.add(val);
writer.add(val);
}
@Override
public void close() throws IOException
{
final File littleEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.LITTLE_ENDIAN);
littleEndianFile.delete();
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
outFile.delete();
MetricHolder.writeFloatMetric(
Files.newOutputStreamSupplier(littleEndianFile, true), metricName, littleMetricsWriter
Files.newOutputStreamSupplier(outFile, true), metricName, writer
);
IndexIO.checkFileSize(littleEndianFile);
IndexIO.checkFileSize(outFile);
final File bigEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.BIG_ENDIAN);
bigEndianFile.delete();
MetricHolder.writeFloatMetric(
Files.newOutputStreamSupplier(bigEndianFile, true), metricName, bigEndianMetricsWriter
);
IndexIO.checkFileSize(bigEndianFile);
littleMetricsWriter = null;
bigEndianMetricsWriter = null;
writer = null;
}
}

View File

@ -44,6 +44,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.ToLowerCaseAggregatorFactory;
import com.metamx.druid.guava.FileOutputSupplier;
import com.metamx.druid.guava.GuavaUtils;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
@ -75,8 +76,10 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
@ -139,26 +142,26 @@ public class IndexMerger
);
}
public static File mergeMMapped(
List<MMappedIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
) throws IOException
{
return mergeMMapped(indexes, metricAggs, outDir, new NoopProgressIndicator());
return mergeQueryableIndex(indexes, metricAggs, outDir, new NoopProgressIndicator());
}
public static File mergeMMapped(
List<MMappedIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress
) throws IOException
{
return merge(
Lists.transform(
indexes,
new Function<MMappedIndex, IndexableAdapter>()
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(@Nullable final MMappedIndex input)
public IndexableAdapter apply(final QueryableIndex input)
{
return new MMappedIndexAdapter(input);
return new QueryableIndexIndexableAdapter(input);
}
}
),
@ -384,7 +387,6 @@ public class IndexMerger
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
) throws IOException
{
// TODO: make v9 index, complain to Eric when you see this, cause he should be doing it.
Map<String, String> metricTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
for (IndexableAdapter adapter : indexes) {
for (String metric : adapter.getAvailableMetrics()) {
@ -392,11 +394,13 @@ public class IndexMerger
}
}
final Interval dataInterval;
File v8OutDir = new File(outDir, "v8-tmp");
v8OutDir.mkdirs();
/************* Main index.drd file **************/
progress.progress();
long startTime = System.currentTimeMillis();
File indexFile = new File(outDir, "index.drd");
File indexFile = new File(v8OutDir, "index.drd");
FileOutputStream fileOutputStream = null;
FileChannel channel = null;
@ -426,7 +430,7 @@ public class IndexMerger
fileOutputStream = null;
}
IndexIO.checkFileSize(indexFile);
log.info("outDir[%s] completed index.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Setup Dim Conversions **************/
progress.progress();
@ -499,7 +503,7 @@ public class IndexMerger
}
dimensionCardinalities.put(dimension, count);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(outDir, dimension), true);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
dimOuts.add(dimOut);
writer.close();
@ -514,7 +518,7 @@ public class IndexMerger
ioPeon.cleanup();
}
log.info("outDir[%s] completed dim conversions in %,d millis.", outDir, System.currentTimeMillis() - startTime);
log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Walk through data sets and merge them *************/
progress.progress();
@ -573,15 +577,11 @@ public class IndexMerger
Iterable<Rowboat> theRows = rowMergerFn.apply(boats);
CompressedLongsSupplierSerializer littleEndianTimeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", ByteOrder.LITTLE_ENDIAN
);
CompressedLongsSupplierSerializer bigEndianTimeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "big_end_time", ByteOrder.BIG_ENDIAN
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER
);
littleEndianTimeWriter.open();
bigEndianTimeWriter.open();
timeWriter.open();
ArrayList<VSizeIndexedWriter> forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (String dimension : mergedDimensions) {
@ -595,7 +595,7 @@ public class IndexMerger
String metric = entry.getKey();
String typeName = entry.getValue();
if ("float".equals(typeName)) {
metWriters.add(new FloatMetricColumnSerializer(metric, outDir, ioPeon));
metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon));
} else {
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
@ -603,7 +603,7 @@ public class IndexMerger
throw new ISE("Unknown type[%s]", typeName);
}
metWriters.add(new ComplexMetricColumnSerializer(metric, outDir, ioPeon, serde));
metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde));
}
}
for (MetricColumnSerializer metWriter : metWriters) {
@ -621,8 +621,7 @@ public class IndexMerger
for (Rowboat theRow : theRows) {
progress.progress();
littleEndianTimeWriter.add(theRow.getTimestamp());
bigEndianTimeWriter.add(theRow.getTimestamp());
timeWriter.add(theRow.getTimestamp());
final Object[] metrics = theRow.getMetrics();
for (int i = 0; i < metrics.length; ++i) {
@ -650,7 +649,7 @@ public class IndexMerger
if ((++rowCount % 500000) == 0) {
log.info(
"outDir[%s] walked 500,000/%,d rows in %,d millis.", outDir, rowCount, System.currentTimeMillis() - time
"outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time
);
time = System.currentTimeMillis();
}
@ -660,17 +659,11 @@ public class IndexMerger
rowNumConversion.rewind();
}
final File littleEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.LITTLE_ENDIAN);
littleEndianFile.delete();
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(littleEndianFile, true);
littleEndianTimeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(littleEndianFile);
final File bigEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.BIG_ENDIAN);
bigEndianFile.delete();
out = Files.newOutputStreamSupplier(bigEndianFile, true);
bigEndianTimeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(bigEndianFile);
final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER);
timeFile.delete();
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(timeFile, true);
timeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(timeFile);
for (int i = 0; i < mergedDimensions.size(); ++i) {
forwardDimWriters.get(i).close();
@ -684,7 +677,7 @@ public class IndexMerger
ioPeon.cleanup();
log.info(
"outDir[%s] completed walk through of %,d rows in %,d millis.",
outDir,
v8OutDir,
rowCount,
System.currentTimeMillis() - startTime
);
@ -692,7 +685,7 @@ public class IndexMerger
/************ Create Inverted Indexes *************/
startTime = System.currentTimeMillis();
final File invertedFile = new File(outDir, "inverted.drd");
final File invertedFile = new File(v8OutDir, "inverted.drd");
Files.touch(invertedFile);
out = Files.newOutputStreamSupplier(invertedFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
@ -725,10 +718,7 @@ public class IndexMerger
}
ConciseSet bitset = new ConciseSet();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
for (Integer row : CombiningIterable.createSplatted(convertedInverteds, Ordering.<Integer>natural().nullsFirst())) {
if (row != INVALID_ROW) {
bitset.add(row);
}
@ -744,33 +734,34 @@ public class IndexMerger
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
}
log.info("outDir[%s] completed inverted.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
final ArrayList<String> expectedFiles = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
"index.drd", "inverted.drd", "time_BIG_ENDIAN.drd", "time_LITTLE_ENDIAN.drd"
"index.drd", "inverted.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER)
),
Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")),
Iterables.transform(mergedMetrics, GuavaUtils.formatFunction("met_%s_LITTLE_ENDIAN.drd")),
Iterables.transform(mergedMetrics, GuavaUtils.formatFunction("met_%s_BIG_ENDIAN.drd"))
Iterables.transform(
mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER))
)
)
);
Map<String, File> files = Maps.newLinkedHashMap();
for (String fileName : expectedFiles) {
files.put(fileName, new File(outDir, fileName));
files.put(fileName, new File(v8OutDir, fileName));
}
File smooshDir = new File(outDir, "smoosher");
File smooshDir = new File(v8OutDir, "smoosher");
smooshDir.mkdir();
for (Map.Entry<String, File> entry : Smoosh.smoosh(outDir, smooshDir, files).entrySet()) {
for (Map.Entry<String, File> entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) {
entry.getValue().delete();
}
for (File file : smooshDir.listFiles()) {
Files.move(file, new File(outDir, file.getName()));
Files.move(file, new File(v8OutDir, file.getName()));
}
if (!smooshDir.delete()) {
@ -780,18 +771,21 @@ public class IndexMerger
createIndexDrdFile(
IndexIO.CURRENT_VERSION_ID,
outDir,
v8OutDir,
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
dataInterval
);
IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir);
FileUtils.deleteDirectory(v8OutDir);
return outDir;
}
private static <T extends Comparable> ArrayList<T> mergeIndexed(final List<Iterable<T>> indexedLists)
{
TreeSet<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst());
Set<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst());
for (Iterable<T> indexedList : indexedLists) {
for (T val : indexedList) {

View File

@ -53,6 +53,12 @@ public class MMappedIndexQueryableIndex implements QueryableIndex
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return index.getTimestamps().size();
}
@Override
public Indexed<String> getColumnNames()
{
@ -91,7 +97,7 @@ public class MMappedIndexQueryableIndex implements QueryableIndex
return new FloatColumn(metricHolder.floatType);
}
else {
return new ComplexColumnImpl(metricHolder.getComplexType());
return new ComplexColumnImpl(metricHolder.getTypeName(), metricHolder.getComplexType());
}
}
}

View File

@ -0,0 +1,290 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
import com.metamx.druid.index.column.GenericColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.kv.ArrayBasedIndexedInts;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.EmptyIndexedInts;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.kv.ListIndexed;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
/**
*/
public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
private final int numRows;
private final QueryableIndex input;
public QueryableIndexIndexableAdapter(QueryableIndex input)
{
this.input = input;
numRows = input.getNumRows();
}
@Override
public Interval getDataInterval()
{
return input.getDataInterval();
}
@Override
public int getNumRows()
{
return numRows;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return input.getAvailableDimensions();
}
@Override
public Indexed<String> getAvailableMetrics()
{
final Set<String> columns = Sets.newLinkedHashSet(input.getColumnNames());
final HashSet<String> dimensions = Sets.newHashSet(getAvailableDimensions());
return new ListIndexed<String>(
Lists.newArrayList(Sets.difference(columns, dimensions)),
String.class
);
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
final Column column = input.getColumn(dimension);
if (column == null) {
return null;
}
final DictionaryEncodedColumn dict = column.getDictionaryEncoding();
if (dict == null) {
return null;
}
return new Indexed<String>()
{
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return dict.getCardinality();
}
@Override
public String get(int index)
{
return dict.lookupName(index);
}
@Override
public int indexOf(String value)
{
return dict.lookupId(value);
}
@Override
public Iterator<String> iterator()
{
return IndexedIterable.create(this).iterator();
}
};
}
@Override
public Iterable<Rowboat> getRows()
{
return new Iterable<Rowboat>()
{
@Override
public Iterator<Rowboat> iterator()
{
return new Iterator<Rowboat>()
{
final GenericColumn timestamps = input.getTimeColumn().getGenericColumn();
final Object[] metrics;
final Map<String, DictionaryEncodedColumn> dimensions;
final int numMetrics = getAvailableMetrics().size();
int currRow = 0;
boolean done = false;
{
dimensions = Maps.newLinkedHashMap();
for (String dim : input.getAvailableDimensions()) {
dimensions.put(dim, input.getColumn(dim).getDictionaryEncoding());
}
final Indexed<String> availableMetrics = getAvailableMetrics();
metrics = new Object[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) {
final Column column = input.getColumn(availableMetrics.get(i));
final ValueType type = column.getCapabilities().getType();
switch (type) {
case FLOAT:
metrics[i] = column.getGenericColumn();
break;
case COMPLEX:
metrics[i] = column.getComplexColumn();
break;
default:
throw new ISE("Cannot handle type[%s]", type);
}
}
}
@Override
public boolean hasNext()
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
for (Object metric : metrics) {
if (metric instanceof Closeable) {
Closeables.closeQuietly((Closeable) metric);
}
}
done = true;
}
return hasNext;
}
@Override
public Rowboat next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
int[][] dims = new int[dimensions.size()][];
int dimIndex = 0;
for (String dim : dimensions.keySet()) {
final DictionaryEncodedColumn dict = dimensions.get(dim);
final IndexedInts dimVals;
if (dict.hasMultipleValues()) {
dimVals = dict.getMultiValueRow(currRow);
}
else {
dimVals = new ArrayBasedIndexedInts(new int[]{dict.getSingleValueRow(currRow)});
}
int[] theVals = new int[dimVals.size()];
for (int j = 0; j < theVals.length; ++j) {
theVals[j] = dimVals.get(j);
}
dims[dimIndex++] = theVals;
}
Object[] metricArray = new Object[numMetrics];
for (int i = 0; i < metricArray.length; ++i) {
if (metrics[i] instanceof GenericColumn) {
metricArray[i] = ((GenericColumn) metrics[i]).getFloatSingleValueRow(currRow);
}
else if (metrics[i] instanceof ComplexColumn) {
metricArray[i] = ((ComplexColumn) metrics[i]).getRowValue(currRow);
}
}
final Rowboat retVal = new Rowboat(
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow
);
++currRow;
return retVal;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public IndexedInts getInverteds(String dimension, String value)
{
final Column column = input.getColumn(dimension);
if (column == null) {
return new EmptyIndexedInts();
}
final BitmapIndex bitmaps = column.getBitmapIndex();
if (bitmaps == null) {
return new EmptyIndexedInts();
}
return new ConciseCompressedIndexedInts(bitmaps.getConciseSet(value));
}
@Override
public String getMetricType(String metric)
{
final Column column = input.getColumn(metric);
final ValueType type = column.getCapabilities().getType();
switch (type) {
case FLOAT:
return "float";
case COMPLEX:
return column.getComplexColumn().getTypeName();
default:
throw new ISE("Unknown type[%s]", type);
}
}
}

View File

@ -277,7 +277,10 @@ public class GroupByQueryEngine
dimNames = new String[dimensionSpecs.size()];
for (int i = 0; i < dimensionSpecs.size(); ++i) {
final DimensionSpec dimSpec = dimensionSpecs.get(i);
dimensions.add(cursor.makeDimensionSelector(dimSpec.getDimension()));
final DimensionSelector selector = cursor.makeDimensionSelector(dimSpec.getDimension());
if (selector != null) {
dimensions.add(selector);
}
dimNames[i] = dimSpec.getOutputName();
}

View File

@ -23,12 +23,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.QueryableIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
public class EmptyIndexTest
{
@ -48,11 +48,11 @@ public class EmptyIndexTest
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex);
IndexMerger.merge(Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir);
MMappedIndex emptyIndexMMapped = IndexIO.mapDir(tmpDir);
QueryableIndex emptyQueryableIndex = IndexIO.loadIndex(tmpDir);
Assert.assertEquals("getAvailableDimensions", 0, Iterables.size(emptyIndexMMapped.getAvailableDimensions()));
Assert.assertEquals("getAvailableMetrics", 0, Iterables.size(emptyIndexMMapped.getAvailableMetrics()));
Assert.assertEquals("getDataInterval", new Interval("2012-08-01/P3D"), emptyIndexMMapped.getDataInterval());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyIndexMMapped.getReadOnlyTimestamps().size());
Assert.assertEquals("getAvailableDimensions", 0, Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
Assert.assertEquals("getAvailableMetrics", 0, Iterables.size(emptyQueryableIndex.getColumnNames()));
Assert.assertEquals("getDataInterval", new Interval("2012-08-01/P3D"), emptyQueryableIndex.getDataInterval());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyQueryableIndex.getTimeColumn().getLength());
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.input.MapBasedInputRow;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
@ -44,11 +45,11 @@ public class IndexMergerTest
final File tempDir = Files.createTempDir();
try {
MMappedIndex index = IndexIO.mapDir(IndexMerger.persist(toPersist, tempDir));
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir));
Assert.assertEquals(2, index.getTimestamps().size());
Assert.assertEquals(2, index.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(0, index.getAvailableMetrics().size());
Assert.assertEquals(2, index.getColumnNames().size());
}
finally {
tempDir.delete();
@ -84,25 +85,25 @@ public class IndexMergerTest
final File tempDir2 = Files.createTempDir();
final File mergedDir = Files.createTempDir();
try {
MMappedIndex index1 = IndexIO.mapDir(IndexMerger.persist(toPersist1, tempDir1));
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1));
Assert.assertEquals(2, index1.getTimestamps().size());
Assert.assertEquals(2, index1.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(0, index1.getAvailableMetrics().size());
Assert.assertEquals(2, index1.getColumnNames().size());
MMappedIndex index2 = IndexIO.mapDir(IndexMerger.persist(toPersist2, tempDir2));
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getTimestamps().size());
Assert.assertEquals(2, index2.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(0, index2.getAvailableMetrics().size());
Assert.assertEquals(2, index2.getColumnNames().size());
MMappedIndex merged = IndexIO.mapDir(
IndexMerger.mergeMMapped(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir)
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir)
);
Assert.assertEquals(3, merged.getTimestamps().size());
Assert.assertEquals(3, merged.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(0, merged.getAvailableMetrics().size());
Assert.assertEquals(2, merged.getColumnNames().size());
}
finally {
FileUtils.deleteQuietly(tempDir1);

View File

@ -119,8 +119,8 @@ public class TestIndex
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile);
mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeMMapped(
Arrays.asList(IndexIO.mapDir(topFile), IndexIO.mapDir(bottomFile)),
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)),
METRIC_AGGS,
mergedFile
)