Merge branch 'master' into killsegments

Conflicts:
	merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
This commit is contained in:
Fangjin Yang 2013-01-21 14:47:49 -08:00
commit bab9ee8827
57 changed files with 1424 additions and 348 deletions

View File

@ -59,6 +59,18 @@ public class DbConnector
);
}
public static void createConfigTable(final DBI dbi, final String configTableName)
{
createTable(
dbi,
configTableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload LONGTEXT NOT NULL, PRIMARY KEY(name))",
configTableName
)
);
}
public static void createTable(
final DBI dbi,
final String tableName,

View File

@ -2,19 +2,11 @@
"queryType": "groupBy",
"dataSource": "twitterstream",
"granularity": "all",
"dimensions": ["lang"],
"dimensions": ["lang", "utc_offset"],
"aggregations":[
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"},
{ "type": "max", "fieldName": "max_statuses_count", "name": "theMaxStatusesCount"},
{ "type": "max", "fieldName": "max_retweet_count", "name": "theMaxRetweetCount"},
{ "type": "max", "fieldName": "max_friends_count", "name": "theMaxFriendsCount"},
{ "type": "max", "fieldName": "max_follower_count", "name": "theMaxFollowerCount"},
{ "type": "doubleSum", "fieldName": "total_statuses_count", "name": "total_tweets_all_time"}
{ "type": "count", "name": "rows"},
{ "type": "doubleSum", "fieldName": "tweets", "name": "tweets"}
],
"filter": { "type": "selector", "dimension": "lang", "value": "en" },
"intervals":["2012-10-01T00:00/2020-01-01T00"]
}

View File

@ -1,25 +1,34 @@
package druid.examples.twitter;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonTypeName;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import twitter4j.*;
import org.codehaus.jackson.annotate.JsonTypeName;
import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.*;
import static java.lang.Thread.sleep;
/**
@ -241,30 +250,26 @@ public class TwitterSpritzerFirehoseFactory implements FirehoseFactory {
} catch (InterruptedException e) {
throw new RuntimeException("InterruptedException", e);
}
//log.info("twitterStatus: "+ status.getCreatedAt() + " @" + status.getUser().getScreenName() + " - " + status.getText());//DEBUG
// theMap.put("twid", status.getUser().getScreenName());
// theMap.put("msg", status.getText()); // ToDo: verify encoding
HashtagEntity[] hts = status.getHashtagEntities();
if (hts != null && hts.length > 0) {
// ToDo: get all the hash tags instead of just the first one
theMap.put("htags", hts[0].getText());
} else {
theMap.put("htags", null);
List<String> hashTags = Lists.newArrayListWithExpectedSize(hts.length);
for (HashtagEntity ht : hts) {
hashTags.add(ht.getText());
}
theMap.put("htags", Arrays.asList(hashTags.get(0)));
}
long retweetCount = status.getRetweetCount();
theMap.put("retweet_count", retweetCount);
User u = status.getUser();
if (u != null) {
theMap.put("follower_count", u.getFollowersCount());
theMap.put("friends_count", u.getFriendsCount());
theMap.put("lang", u.getLang());
theMap.put("utc_offset", u.getUtcOffset()); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", u.getStatusesCount());
} else {
log.error("status.getUser() is null");
User user = status.getUser();
if (user != null) {
theMap.put("follower_count", user.getFollowersCount());
theMap.put("friends_count", user.getFriendsCount());
theMap.put("lang", user.getLang());
theMap.put("utc_offset", user.getUtcOffset()); // resolution in seconds, -1 if not available?
theMap.put("statuses_count", user.getStatusesCount());
}
return new MapBasedInputRow(status.getCreatedAt().getTime(), dimensions, theMap);

View File

@ -31,8 +31,8 @@
"firehose": {
"type": "twitzer",
"maxEventCount": 50000,
"maxRunMinutes": 10
"maxEventCount": 500000,
"maxRunMinutes": 120
},
"plumber": {

View File

@ -28,6 +28,7 @@ import org.joda.time.Interval;
public interface QueryableIndex extends ColumnSelector
{
public Interval getDataInterval();
public int getNumRows();
public Indexed<String> getColumnNames();
public Indexed<String> getAvailableDimensions();
}

View File

@ -56,6 +56,12 @@ public class SimpleQueryableIndex implements QueryableIndex
return dataInterval;
}
@Override
public int getNumRows()
{
return timeColumn.getLength();
}
@Override
public Indexed<String> getColumnNames()
{

View File

@ -25,6 +25,7 @@ public interface Column
{
public ColumnCapabilities getCapabilities();
public int getLength();
public DictionaryEncodedColumn getDictionaryEncoding();
public RunLengthColumn getRunLengthColumn();
public GenericColumn getGenericColumn();

View File

@ -26,5 +26,6 @@ import java.io.Closeable;
public interface ComplexColumn extends Closeable
{
public Class<?> getClazz();
public String getTypeName();
public Object getRowValue(int rowNum);
}

View File

@ -29,10 +29,12 @@ public class ComplexColumnImpl extends AbstractColumn
.setType(ValueType.COMPLEX);
private final Indexed column;
private final String typeName;
public ComplexColumnImpl(Indexed column)
public ComplexColumnImpl(String typeName, Indexed column)
{
this.column = column;
this.typeName = typeName;
}
@Override
@ -41,9 +43,15 @@ public class ComplexColumnImpl extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public ComplexColumn getComplexColumn()
{
return new IndexedComplexColumn(column);
return new IndexedComplexColumn(typeName, column);
}
}

View File

@ -26,6 +26,7 @@ import com.metamx.druid.kv.IndexedInts;
public interface DictionaryEncodedColumn
{
public int size();
public boolean hasMultipleValues();
public int getSingleValueRow(int rowNum);
public IndexedInts getMultiValueRow(int rowNum);
public String lookupName(int id);

View File

@ -19,8 +19,7 @@
package com.metamx.druid.index.column;
import com.google.common.base.Supplier;
import com.metamx.druid.kv.IndexedFloats;
import com.metamx.druid.index.v1.CompressedFloatsIndexedSupplier;
/**
*/
@ -29,9 +28,9 @@ public class FloatColumn extends AbstractColumn
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.FLOAT);
private final Supplier<IndexedFloats> column;
private final CompressedFloatsIndexedSupplier column;
public FloatColumn(Supplier<IndexedFloats> column)
public FloatColumn(CompressedFloatsIndexedSupplier column)
{
this.column = column;
}
@ -42,6 +41,12 @@ public class FloatColumn extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public GenericColumn getGenericColumn()
{

View File

@ -30,6 +30,8 @@ import java.io.Closeable;
public interface GenericColumn extends Closeable
{
public int size();
public ValueType getType();
public boolean hasMultipleValues();
public String getStringSingleValueRow(int rowNum);
public Indexed<String> getStringMultiValueRow(int rowNum);

View File

@ -28,12 +28,14 @@ import java.io.IOException;
public class IndexedComplexColumn implements ComplexColumn
{
private final Indexed column;
private final String typeName;
public IndexedComplexColumn(
Indexed column
String typeName, Indexed column
)
{
this.column = column;
this.typeName = typeName;
}
@Override
public Class<?> getClazz()
@ -41,6 +43,12 @@ public class IndexedComplexColumn implements ComplexColumn
return column.getClazz();
}
@Override
public String getTypeName()
{
return typeName;
}
@Override
public Object getRowValue(int rowNum)
{

View File

@ -43,6 +43,18 @@ public class IndexedFloatsGenericColumn implements GenericColumn
return column.size();
}
@Override
public ValueType getType()
{
return ValueType.FLOAT;
}
@Override
public boolean hasMultipleValues()
{
return false;
}
@Override
public String getStringSingleValueRow(int rowNum)
{

View File

@ -43,6 +43,18 @@ public class IndexedLongsGenericColumn implements GenericColumn
return column.size();
}
@Override
public ValueType getType()
{
return ValueType.LONG;
}
@Override
public boolean hasMultipleValues()
{
return false;
}
@Override
public String getStringSingleValueRow(int rowNum)
{

View File

@ -19,8 +19,7 @@
package com.metamx.druid.index.column;
import com.google.common.base.Supplier;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.index.v1.CompressedLongsIndexedSupplier;
/**
*/
@ -29,9 +28,9 @@ public class LongColumn extends AbstractColumn
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.LONG);
private final Supplier<IndexedLongs> column;
private final CompressedLongsIndexedSupplier column;
public LongColumn(Supplier<IndexedLongs> column)
public LongColumn(CompressedLongsIndexedSupplier column)
{
this.column = column;
}
@ -42,6 +41,12 @@ public class LongColumn extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public GenericColumn getGenericColumn()
{

View File

@ -20,6 +20,7 @@
package com.metamx.druid.index.column;
import com.google.common.base.Supplier;
import com.google.common.io.Closeables;
/**
*/
@ -55,6 +56,19 @@ class SimpleColumn implements Column
return capabilities;
}
@Override
public int getLength()
{
GenericColumn column = null;
try {
column = genericColumn.get();
return column.size();
}
finally {
Closeables.closeQuietly(column);
}
}
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{

View File

@ -46,7 +46,13 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
@Override
public int size()
{
return column == null ? multiValueColumn.size() : column.size();
return hasMultipleValues() ? multiValueColumn.size() : column.size();
}
@Override
public boolean hasMultipleValues()
{
return column == null;
}
@Override

View File

@ -55,6 +55,12 @@ public class StringMultiValueColumn extends AbstractColumn
return CAPABILITIES;
}
@Override
public int getLength()
{
return column.size();
}
@Override
public DictionaryEncodedColumn getDictionaryEncoding()
{
@ -66,6 +72,12 @@ public class StringMultiValueColumn extends AbstractColumn
return column.size();
}
@Override
public boolean hasMultipleValues()
{
return true;
}
@Override
public int getSingleValueRow(int rowNum)
{

View File

@ -51,7 +51,12 @@ public class BitmapIndexColumnPartSupplier implements Supplier<BitmapIndex>
{
final int index = dictionary.indexOf(value);
return index >= 0 ? bitmaps.get(index) : EMPTY_SET;
if (index < 0) {
return EMPTY_SET;
}
final ImmutableConciseSet bitmap = bitmaps.get(index);
return bitmap == null ? EMPTY_SET : bitmap;
}
};
}

View File

@ -36,25 +36,28 @@ public class ComplexColumnPartSerde implements ColumnPartSerde
{
@JsonCreator
public static ComplexColumnPartSerde createDeserializer(
@JsonProperty("complexType") String complexType
@JsonProperty("typeName") String complexType
)
{
return new ComplexColumnPartSerde(null, complexType);
}
private final GenericIndexed column;
private final String typeName;
private final ComplexMetricSerde serde;
public ComplexColumnPartSerde(GenericIndexed column, String complexType)
public ComplexColumnPartSerde(GenericIndexed column, String typeName)
{
this.column = column;
serde = ComplexMetrics.getSerdeForType(complexType);
this.typeName = typeName;
serde = ComplexMetrics.getSerdeForType(typeName);
}
@JsonProperty
public GenericIndexed getColumn()
public String getTypeName()
{
return column;
return typeName;
}
@Override

View File

@ -32,8 +32,7 @@ public class ComplexColumnPartSupplier implements Supplier<ComplexColumn>
private final String typeName;
public ComplexColumnPartSupplier(
final GenericIndexed complexType,
final String typeName
final String typeName, final GenericIndexed complexType
) {
this.complexType = complexType;
this.typeName = typeName;
@ -42,6 +41,6 @@ public class ComplexColumnPartSupplier implements Supplier<ComplexColumn>
@Override
public ComplexColumn get()
{
return new IndexedComplexColumn(complexType);
return new IndexedComplexColumn(typeName, complexType);
}
}

View File

@ -59,6 +59,11 @@ public class CompressedFloatsIndexedSupplier implements Supplier<IndexedFloats>
this.baseFloatBuffers = baseFloatBuffers;
}
public int size()
{
return totalSize;
}
@Override
public IndexedFloats get()
{

View File

@ -524,7 +524,7 @@ public class IncrementalIndex implements Iterable<Row>
public String get(String value)
{
return poorMansInterning.get(value);
return value == null ? null : poorMansInterning.get(value);
}
public int getId(String value)

View File

@ -21,6 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -58,6 +59,7 @@ import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import com.metamx.druid.utils.SerializerUtils;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Interval;
@ -70,6 +72,7 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -99,7 +102,7 @@ public class IndexIO
private static final Logger log = new Logger(IndexIO.class);
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder();
// This should really be provided by DI, should be changed once we switch around to using a DI framework
private static final ObjectMapper mapper = new DefaultObjectMapper();
@ -120,6 +123,7 @@ public class IndexIO
return handler.canBeMapped(inDir);
}
@Deprecated
public static MMappedIndex mapDir(final File inDir) throws IOException
{
init();
@ -332,7 +336,7 @@ public class IndexIO
throw new UnsupportedOperationException("Shouldn't ever happen in a cluster that is not owned by MMX.");
}
public void convertV8toV9(File v8Dir, File v9Dir) throws IOException
public static void convertV8toV9(File v8Dir, File v9Dir) throws IOException
{
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
@ -383,22 +387,70 @@ public class IndexIO
serializerUtils.writeString(nameBAOS, dimension);
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
final GenericIndexed<String> dictionary = GenericIndexed.read(
GenericIndexed<String> dictionary = GenericIndexed.read(
dimBuffer, GenericIndexed.stringStrategy
);
VSizeIndexedInts singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableConciseSet> bitmaps = bitmapIndexes.get(dimension);
boolean onlyOneValue = true;
for (VSizeIndexedInts rowValue : multiValCol) {
ConciseSet nullsSet = null;
for (int i = 0; i < multiValCol.size(); ++i) {
VSizeIndexedInts rowValue = multiValCol.get(i);
if (!onlyOneValue) {
break;
}
if (rowValue.size() > 1) {
onlyOneValue = false;
}
if (rowValue.size() == 0) {
if (nullsSet == null) {
nullsSet = new ConciseSet();
}
nullsSet.add(i);
}
}
if (onlyOneValue) {
log.info("Dimension[%s] is single value, converting...", dimension);
final boolean bumpedDictionary;
if (nullsSet != null) {
log.info("Dimension[%s] has null rows.", dimension);
final ImmutableConciseSet theNullSet = ImmutableConciseSet.newImmutableFromMutable(nullsSet);
if (dictionary.get(0) != null) {
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
bumpedDictionary = true;
final List<String> nullList = Lists.newArrayList();
nullList.add(null);
dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dictionary),
GenericIndexed.stringStrategy
);
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(Arrays.asList(theNullSet), bitmaps),
ConciseCompressedIndexedInts.objectStrategy
);
}
else {
bumpedDictionary = false;
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(
Arrays.asList(ImmutableConciseSet.union(theNullSet, bitmaps.get(0))),
Iterables.skip(bitmaps, 1)
),
ConciseCompressedIndexedInts.objectStrategy
);
}
}
else {
bumpedDictionary = false;
}
final VSizeIndexed finalMultiValCol = multiValCol;
singleValCol = VSizeIndexedInts.fromList(
new AbstractList<Integer>()
@ -406,7 +458,8 @@ public class IndexIO
@Override
public Integer get(int index)
{
return finalMultiValCol.get(index).get(0);
final VSizeIndexedInts ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}
@Override
@ -423,7 +476,7 @@ public class IndexIO
}
builder.addSerde(
new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmapIndexes.get(dimension))
new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmaps)
);
final ColumnDescriptor serdeficator = builder.build();
@ -587,7 +640,7 @@ public class IndexIO
.setType(ValueType.COMPLEX)
.setComplexColumn(
new ComplexColumnPartSupplier(
(GenericIndexed) metricHolder.complexType, metricHolder.getTypeName()
metricHolder.getTypeName(), (GenericIndexed) metricHolder.complexType
)
)
.build()

View File

@ -38,10 +38,10 @@ import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ParserUtils;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.indexer.rollup.DataRollupSpec;
import com.metamx.druid.input.MapBasedInputRow;
import org.apache.commons.io.FileUtils;
@ -359,7 +359,7 @@ public class IndexGeneratorJob implements Jobby
log.info("%,d lines completed.", lineCount);
List<MMappedIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
@ -389,9 +389,9 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(finalFile);
for (File file : toMerge) {
indexes.add(IndexIO.mapDir(file));
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = IndexMerger.mergeMMapped(
mergedBase = IndexMerger.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()
{
@Override

View File

@ -21,22 +21,23 @@ package com.metamx.druid.merger.common.index;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import com.metamx.druid.Query;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Plumber;
import com.metamx.druid.realtime.PlumberSchool;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.realtime.Sink;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@ -129,16 +130,22 @@ public class YeOldePlumberSchool implements PlumberSchool
} else if(spilled.size() == 1) {
fileToUpload = Iterables.getOnlyElement(spilled);
} else {
List<MMappedIndex> indexes = Lists.newArrayList();
List<QueryableIndex> indexes = Lists.newArrayList();
for (final File oneSpill : spilled) {
indexes.add(IndexIO.mapDir(oneSpill));
indexes.add(IndexIO.loadIndex(oneSpill));
}
fileToUpload = new File(tmpSegmentDir, "merged");
IndexMerger.mergeMMapped(indexes, schema.getAggregators(), fileToUpload);
IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
}
final DataSegment segmentToUpload = theSink.getSegment().withVersion(version);
// Map merged segment so we can extract dimensions
final QueryableIndex mappedSegment = IndexIO.loadIndex(fileToUpload);
final DataSegment segmentToUpload = theSink.getSegment()
.withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions()))
.withVersion(version);
segmentPusher.push(fileToUpload, segmentToUpload);
log.info(

View File

@ -30,7 +30,7 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.IndexableAdapter;
import com.metamx.druid.index.v1.MMappedIndexAdapter;
import com.metamx.druid.index.v1.QueryableIndexIndexableAdapter;
import com.metamx.druid.index.v1.Rowboat;
import com.metamx.druid.index.v1.RowboatFilteringIndexAdapter;
import org.codehaus.jackson.annotate.JsonCreator;
@ -90,8 +90,8 @@ public class AppendTask extends MergeTask
for (final SegmentToMergeHolder holder : segmentsToMerge) {
adapters.add(
new RowboatFilteringIndexAdapter(
new MMappedIndexAdapter(
IndexIO.mapDir(holder.getFile())
new QueryableIndexIndexableAdapter(
IndexIO.loadIndex(holder.getFile())
),
new Predicate<Rowboat>()
{

View File

@ -25,9 +25,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.MMappedIndex;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
@ -57,16 +57,16 @@ public class DefaultMergeTask extends MergeTask
public File merge(final Map<DataSegment, File> segments, final File outDir)
throws Exception
{
return IndexMerger.mergeMMapped(
return IndexMerger.mergeQueryableIndex(
Lists.transform(
ImmutableList.copyOf(segments.values()),
new Function<File, MMappedIndex>()
new Function<File, QueryableIndex>()
{
@Override
public MMappedIndex apply(@Nullable File input)
public QueryableIndex apply(@Nullable File input)
{
try {
return IndexIO.mapDir(input);
return IndexIO.loadIndex(input);
}
catch (Exception e) {
throw Throwables.propagate(e);

View File

@ -39,6 +39,7 @@ import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.emitter.EmittingLogger;
import com.netflix.curator.framework.CuratorFramework;
@ -52,7 +53,7 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -88,6 +89,7 @@ public class RemoteTaskRunner implements TaskRunner
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final ScalingStrategy strategy;
private final WorkerSetupManager workerSetupManager;
// all workers that exist in ZK
private final Map<String, WorkerWrapper> zkWorkers = new ConcurrentHashMap<String, WorkerWrapper>();
@ -109,7 +111,8 @@ public class RemoteTaskRunner implements TaskRunner
PathChildrenCache workerPathCache,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
ScalingStrategy strategy
ScalingStrategy strategy,
WorkerSetupManager workerSetupManager
)
{
this.jsonMapper = jsonMapper;
@ -119,6 +122,7 @@ public class RemoteTaskRunner implements TaskRunner
this.scheduledExec = scheduledExec;
this.retryPolicyFactory = retryPolicyFactory;
this.strategy = strategy;
this.workerSetupManager = workerSetupManager;
}
@LifecycleStart
@ -144,7 +148,7 @@ public class RemoteTaskRunner implements TaskRunner
Worker.class
);
log.info("Worker[%s] removed!", worker.getHost());
removeWorker(worker.getHost());
removeWorker(worker);
}
}
}
@ -169,7 +173,7 @@ public class RemoteTaskRunner implements TaskRunner
public void run()
{
if (currentlyTerminating.isEmpty()) {
if (zkWorkers.size() <= config.getMinNumWorkers()) {
if (zkWorkers.size() <= workerSetupManager.getWorkerSetupData().getMinNumWorkers()) {
return;
}
@ -180,7 +184,7 @@ public class RemoteTaskRunner implements TaskRunner
new Predicate<WorkerWrapper>()
{
@Override
public boolean apply(@Nullable WorkerWrapper input)
public boolean apply(WorkerWrapper input)
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
@ -196,9 +200,9 @@ public class RemoteTaskRunner implements TaskRunner
new Function<WorkerWrapper, String>()
{
@Override
public String apply(@Nullable WorkerWrapper input)
public String apply(WorkerWrapper input)
{
return input.getWorker().getHost();
return input.getWorker().getIp();
}
}
)
@ -218,7 +222,7 @@ public class RemoteTaskRunner implements TaskRunner
}
log.info(
"[%s] still terminating. Wait for all nodes to terminate before trying again.",
"%s still terminating. Wait for all nodes to terminate before trying again.",
currentlyTerminating
);
}
@ -368,7 +372,7 @@ public class RemoteTaskRunner implements TaskRunner
private void addWorker(final Worker worker)
{
try {
currentlyProvisioning.remove(worker.getHost());
currentlyProvisioning.removeAll(strategy.ipLookup(Arrays.<String>asList(worker.getIp())));
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
@ -388,8 +392,7 @@ public class RemoteTaskRunner implements TaskRunner
synchronized (statusLock) {
try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) ||
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))
{
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus;
@ -399,7 +402,7 @@ public class RemoteTaskRunner implements TaskRunner
event.getData().getData(), TaskStatus.class
);
if(!taskStatus.getId().equals(taskId)) {
if (!taskStatus.getId().equals(taskId)) {
// Sanity check
throw new ISE(
"Worker[%s] status id does not match payload id: %s != %s",
@ -408,7 +411,8 @@ public class RemoteTaskRunner implements TaskRunner
taskStatus.getId()
);
}
} catch (Exception e) {
}
catch (Exception e) {
log.warn(e, "Worker[%s] wrote bogus status for task: %s", worker.getHost(), taskId);
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
throw Throwables.propagate(e);
@ -446,7 +450,8 @@ public class RemoteTaskRunner implements TaskRunner
}
}
}
} catch(Exception e) {
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", worker.getHost())
.addData("znode", event.getData().getPath())
@ -478,22 +483,22 @@ public class RemoteTaskRunner implements TaskRunner
* When a ephemeral worker node disappears from ZK, we have to make sure there are no tasks still assigned
* to the worker. If tasks remain, they are retried.
*
* @param workerId - id of the removed worker
* @param worker - the removed worker
*/
private void removeWorker(final String workerId)
private void removeWorker(final Worker worker)
{
currentlyTerminating.remove(workerId);
currentlyTerminating.remove(worker.getHost());
WorkerWrapper workerWrapper = zkWorkers.get(workerId);
WorkerWrapper workerWrapper = zkWorkers.get(worker.getHost());
if (workerWrapper != null) {
try {
Set<String> tasksToRetry = Sets.newHashSet(workerWrapper.getRunningTasks());
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), workerId)));
tasksToRetry.addAll(cf.getChildren().forPath(JOINER.join(config.getTaskPath(), worker.getHost())));
for (String taskId : tasksToRetry) {
TaskWrapper taskWrapper = tasks.get(taskId);
if (taskWrapper != null) {
retryTask(new CleanupPaths(workerId, taskId), tasks.get(taskId));
retryTask(new CleanupPaths(worker.getHost(), taskId), tasks.get(taskId));
}
}
@ -503,7 +508,7 @@ public class RemoteTaskRunner implements TaskRunner
log.error(e, "Failed to cleanly remove worker[%s]");
}
}
zkWorkers.remove(workerId);
zkWorkers.remove(worker.getHost());
}
private WorkerWrapper findWorkerForTask()
@ -526,7 +531,9 @@ public class RemoteTaskRunner implements TaskRunner
public boolean apply(WorkerWrapper input)
{
return (!input.isAtCapacity() &&
input.getWorker().getVersion().compareTo(config.getMinWorkerVersion()) >= 0);
input.getWorker()
.getVersion()
.compareTo(workerSetupManager.getWorkerSetupData().getMinVersion()) >= 0);
}
}
)
@ -551,7 +558,7 @@ public class RemoteTaskRunner implements TaskRunner
}
log.info(
"[%s] still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
"%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.",
currentlyProvisioning
);
}

View File

@ -26,24 +26,7 @@ import org.skife.config.Default;
*/
public abstract class EC2AutoScalingStrategyConfig
{
@Config("druid.indexer.amiId")
public abstract String getAmiId();
@Config("druid.indexer.worker.port")
@Default("8080")
public abstract String getWorkerPort();
@Config("druid.indexer.instanceType")
public abstract String getInstanceType();
@Config("druid.indexer.minNumInstancesToProvision")
@Default("1")
public abstract int getMinNumInstancesToProvision();
@Config("druid.indexer.maxNumInstancesToProvision")
@Default("1")
public abstract int getMaxNumInstancesToProvision();
@Config("druid.indexer.userDataFile")
public abstract String getUserDataFile();
}

View File

@ -37,15 +37,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
@Default("2012-01-01T00:55:00.000Z")
public abstract DateTime getTerminateResourcesOriginDateTime();
@Config("druid.indexer.minWorkerVersion")
public abstract String getMinWorkerVersion();
@Config("druid.indexer.minNumWorkers")
@Default("1")
public abstract int getMinNumWorkers();
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
@Default("1")
@Default("10000")
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration")

View File

@ -0,0 +1,39 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.config;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class WorkerSetupManagerConfig
{
@Config("druid.indexer.configTable")
public abstract String getConfigTable();
@Config("druid.indexer.workerSetupConfigName")
public abstract String getWorkerSetupConfigName();
@Config("druid.indexer.poll.duration")
@Default("PT1M")
public abstract Duration getPollDuration();
}

View File

@ -49,6 +49,9 @@ import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -68,12 +71,11 @@ import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import com.metamx.druid.merger.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.NoopScalingStrategy;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -138,6 +140,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
private CuratorFramework curatorFramework = null;
private ScheduledExecutorFactory scheduledExecutorFactory = null;
private IndexerZkConfig indexerZkConfig;
private WorkerSetupManager workerSetupManager = null;
private TaskRunnerFactory taskRunnerFactory = null;
private TaskMaster taskMaster = null;
private Server server = null;
@ -165,14 +168,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this;
}
public void setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
{
this.mergerDBCoordinator = mergerDBCoordinator;
return this;
}
public void setTaskQueue(TaskQueue taskQueue)
public IndexerCoordinatorNode setTaskQueue(TaskQueue taskQueue)
{
this.taskQueue = taskQueue;
return this;
}
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
@ -187,9 +192,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
return this;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
public IndexerCoordinatorNode setWorkerSetupManager(WorkerSetupManager workerSetupManager)
{
this.workerSetupManager = workerSetupManager;
return this;
}
public IndexerCoordinatorNode setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory)
{
this.taskRunnerFactory = taskRunnerFactory;
return this;
}
public void init() throws Exception
@ -208,6 +220,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeJacksonSubtypes();
initializeCurator();
initializeIndexerZkConfig();
initializeWorkerSetupManager();
initializeTaskRunnerFactory();
initializeTaskMaster();
initializeServer();
@ -226,7 +239,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
jsonMapper,
config,
emitter,
taskQueue
taskQueue,
workerSetupManager
)
);
@ -468,6 +482,27 @@ public class IndexerCoordinatorNode extends RegisteringNode
}
}
public void initializeWorkerSetupManager()
{
if (workerSetupManager == null) {
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class);
DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable());
workerSetupManager = new WorkerSetupManager(
dbi, Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("WorkerSetupManagerExec--%d")
.build()
), jsonMapper, workerSetupManagerConfig
);
}
lifecycle.addManagedInstance(workerSetupManager);
}
public void initializeTaskRunnerFactory()
{
if (taskRunnerFactory == null) {
@ -497,7 +532,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
PropUtils.getProperty(props, "com.metamx.aws.secretKey")
)
),
configFactory.build(EC2AutoScalingStrategyConfig.class)
configFactory.build(EC2AutoScalingStrategyConfig.class),
workerSetupManager
);
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopScalingStrategy();
@ -512,7 +548,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
new PathChildrenCache(curatorFramework, indexerZkConfig.getAnnouncementPath(), true),
retryScheduledExec,
new RetryPolicyFactory(configFactory.build(RetryPolicyConfig.class)),
strategy
strategy,
workerSetupManager
);
}
};

View File

@ -28,6 +28,8 @@ import com.metamx.druid.merger.common.task.MergeTask;
import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter;
import javax.ws.rs.Consumes;
@ -48,18 +50,21 @@ public class IndexerCoordinatorResource
private final IndexerCoordinatorConfig config;
private final ServiceEmitter emitter;
private final TaskQueue tasks;
private final WorkerSetupManager workerSetupManager;
@Inject
public IndexerCoordinatorResource(
IndexerCoordinatorConfig config,
ServiceEmitter emitter,
TaskQueue tasks
TaskQueue tasks,
WorkerSetupManager workerSetupManager
) throws Exception
{
this.config = config;
this.emitter = emitter;
this.tasks = tasks;
this.workerSetupManager = workerSetupManager;
}
@POST
@ -115,4 +120,25 @@ public class IndexerCoordinatorResource
{
return Response.ok(ImmutableMap.of("task", taskid)).build();
}
@GET
@Path("/worker/setup")
@Produces("application/json")
public Response getWorkerSetupData()
{
return Response.ok(workerSetupManager.getWorkerSetupData()).build();
}
@POST
@Path("/worker/setup")
@Consumes("application/json")
public Response setWorkerSetupData(
final WorkerSetupData workerSetupData
)
{
if (!workerSetupManager.setWorkerSetupData(workerSetupData)) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
return Response.ok().build();
}
}

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.google.inject.Provides;
import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
@ -38,18 +39,21 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
private final IndexerCoordinatorConfig indexerCoordinatorConfig;
private final ServiceEmitter emitter;
private final TaskQueue tasks;
private final WorkerSetupManager workerSetupManager;
public IndexerCoordinatorServletModule(
ObjectMapper jsonMapper,
IndexerCoordinatorConfig indexerCoordinatorConfig,
ServiceEmitter emitter,
TaskQueue tasks
TaskQueue tasks,
WorkerSetupManager workerSetupManager
)
{
this.jsonMapper = jsonMapper;
this.indexerCoordinatorConfig = indexerCoordinatorConfig;
this.emitter = emitter;
this.tasks = tasks;
this.workerSetupManager = workerSetupManager;
}
@Override
@ -60,6 +64,7 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskQueue.class).toInstance(tasks);
bind(WorkerSetupManager.class).toInstance(workerSetupManager);
serve("/*").with(GuiceContainer.class);
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import java.util.List;

View File

@ -24,7 +24,6 @@ import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Filter;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
@ -32,11 +31,14 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64;
import org.codehaus.jackson.map.ObjectMapper;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
/**
@ -48,31 +50,45 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
private final ObjectMapper jsonMapper;
private final AmazonEC2Client amazonEC2Client;
private final EC2AutoScalingStrategyConfig config;
private final WorkerSetupManager workerSetupManager;
public EC2AutoScalingStrategy(
ObjectMapper jsonMapper,
AmazonEC2Client amazonEC2Client,
EC2AutoScalingStrategyConfig config
EC2AutoScalingStrategyConfig config,
WorkerSetupManager workerSetupManager
)
{
this.jsonMapper = jsonMapper;
this.amazonEC2Client = amazonEC2Client;
this.config = config;
this.workerSetupManager = workerSetupManager;
}
@Override
public AutoScalingData<Instance> provision()
{
try {
WorkerSetupData setupData = workerSetupManager.getWorkerSetupData();
EC2NodeData workerConfig = setupData.getNodeData();
log.info("Creating new instance(s)...");
RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest(
config.getAmiId(),
config.getMinNumInstancesToProvision(),
config.getMaxNumInstancesToProvision()
workerConfig.getAmiId(),
workerConfig.getMinInstances(),
workerConfig.getMaxInstances()
)
.withInstanceType(InstanceType.fromValue(config.getInstanceType()))
.withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile())))
.withInstanceType(workerConfig.getInstanceType())
.withSecurityGroupIds(workerConfig.getSecurityGroupIds())
.withKeyName(workerConfig.getKeyName())
.withUserData(
Base64.encodeBase64String(
jsonMapper.writeValueAsBytes(
setupData.getUserData()
)
)
)
);
List<String> instanceIds = Lists.transform(
@ -80,7 +96,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
new Function<Instance, String>()
{
@Override
public String apply(@Nullable Instance input)
public String apply(Instance input)
{
return input.getInstanceId();
}
@ -95,9 +111,9 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
new Function<Instance, String>()
{
@Override
public String apply(@Nullable Instance input)
public String apply(Instance input)
{
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort());
return input.getInstanceId();
}
}
),
@ -112,12 +128,12 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
}
@Override
public AutoScalingData<Instance> terminate(List<String> nodeIds)
public AutoScalingData<Instance> terminate(List<String> ids)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", nodeIds)
new Filter("private-ip-address", ids)
)
);
@ -135,7 +151,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
new Function<Instance, String>()
{
@Override
public String apply(@Nullable Instance input)
public String apply(Instance input)
{
return input.getInstanceId();
}
@ -146,13 +162,13 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
return new AutoScalingData<Instance>(
Lists.transform(
instances,
new Function<Instance, String>()
ids,
new Function<String, String>()
{
@Override
public String apply(@Nullable Instance input)
public String apply(@Nullable String input)
{
return String.format("%s:%s", input.getPrivateIpAddress(), config.getWorkerPort());
return String.format("%s:%s", input, config.getWorkerPort());
}
}
),
@ -165,4 +181,36 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
DescribeInstancesResult result = amazonEC2Client.describeInstances(
new DescribeInstancesRequest()
.withFilters(
new Filter("private-ip-address", ips)
)
);
List<Instance> instances = Lists.newArrayList();
for (Reservation reservation : result.getReservations()) {
instances.addAll(reservation.getInstances());
}
List<String> retVal = Lists.transform(
instances,
new Function<Instance, String>()
{
@Override
public String apply(Instance input)
{
return input.getInstanceId();
}
}
);
log.info("Performing lookup: %s --> %s", ips, retVal);
return retVal;
}
}

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.scaling;
import com.metamx.emitter.EmittingLogger;
@ -24,4 +43,11 @@ public class NoopScalingStrategy implements ScalingStrategy<String>
log.info("If I were a real strategy I'd terminate %s now", nodeIds);
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
log.info("I'm not a real strategy so I'm returning what I got %s", ips);
return ips;
}
}

View File

@ -27,5 +27,12 @@ public interface ScalingStrategy<T>
{
public AutoScalingData<T> provision();
public AutoScalingData<T> terminate(List<String> nodeIds);
public AutoScalingData<T> terminate(List<String> ids);
/**
* Provides a lookup of ip addresses to node ids
* @param ips
* @return
*/
public List<String> ipLookup(List<String> ips);
}

View File

@ -0,0 +1,91 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.List;
/**
*/
public class EC2NodeData
{
private final String amiId;
private final String instanceType;
private final int minInstances;
private final int maxInstances;
private final List<String> securityGroupIds;
private final String keyName;
@JsonCreator
public EC2NodeData(
@JsonProperty("amiId") String amiId,
@JsonProperty("instanceType") String instanceType,
@JsonProperty("minInstances") int minInstances,
@JsonProperty("maxInstances") int maxInstances,
@JsonProperty("securityGroupIds") List<String> securityGroupIds,
@JsonProperty("keyName") String keyName
)
{
this.amiId = amiId;
this.instanceType = instanceType;
this.minInstances = minInstances;
this.maxInstances = maxInstances;
this.securityGroupIds = securityGroupIds;
this.keyName = keyName;
}
@JsonProperty
public String getAmiId()
{
return amiId;
}
@JsonProperty
public String getInstanceType()
{
return instanceType;
}
@JsonProperty
public int getMinInstances()
{
return minInstances;
}
@JsonProperty
public int getMaxInstances()
{
return maxInstances;
}
@JsonProperty
public List<String> getSecurityGroupIds()
{
return securityGroupIds;
}
@JsonProperty
public String getKeyName()
{
return keyName;
}
}

View File

@ -0,0 +1,62 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
/**
*/
public class GalaxyUserData
{
public final String env;
public final String version;
public final String type;
@JsonCreator
public GalaxyUserData(
@JsonProperty("env") String env,
@JsonProperty("version") String version,
@JsonProperty("type") String type
)
{
this.env = env;
this.version = version;
this.type = type;
}
@JsonProperty
public String getEnv()
{
return env;
}
@JsonProperty
public String getVersion()
{
return version;
}
@JsonProperty
public String getType()
{
return type;
}
}

View File

@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import java.util.List;
/**
*/
public class WorkerSetupData
{
private final String minVersion;
private final int minNumWorkers;
private final EC2NodeData nodeData;
private final GalaxyUserData userData;
@JsonCreator
public WorkerSetupData(
@JsonProperty("minVersion") String minVersion,
@JsonProperty("minNumWorkers") int minNumWorkers,
@JsonProperty("nodeData") EC2NodeData nodeData,
@JsonProperty("userData") GalaxyUserData userData
)
{
this.minVersion = minVersion;
this.minNumWorkers = minNumWorkers;
this.nodeData = nodeData;
this.userData = userData;
}
@JsonProperty
public String getMinVersion()
{
return minVersion;
}
@JsonProperty
public int getMinNumWorkers()
{
return minNumWorkers;
}
@JsonProperty
public EC2NodeData getNodeData()
{
return nodeData;
}
@JsonProperty
public GalaxyUserData getUserData()
{
return userData;
}
}

View File

@ -0,0 +1,226 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.setup;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.coordinator.config.WorkerSetupManagerConfig;
import org.apache.commons.collections.MapUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Duration;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class WorkerSetupManager
{
private static final Logger log = new Logger(WorkerSetupManager.class);
private final DBI dbi;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService exec;
private final WorkerSetupManagerConfig config;
private final Object lock = new Object();
private volatile AtomicReference<WorkerSetupData> workerSetupData = new AtomicReference<WorkerSetupData>(null);
private volatile boolean started = false;
public WorkerSetupManager(
DBI dbi,
ScheduledExecutorService exec,
ObjectMapper jsonMapper,
WorkerSetupManagerConfig config
)
{
this.dbi = dbi;
this.exec = exec;
this.jsonMapper = jsonMapper;
this.config = config;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.getPollDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
);
started = true;
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
started = false;
}
}
public void poll()
{
try {
List<WorkerSetupData> setupDataList = dbi.withHandle(
new HandleCallback<List<WorkerSetupData>>()
{
@Override
public List<WorkerSetupData> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE name = :name",
config.getConfigTable()
)
)
.bind("name", config.getWorkerSetupConfigName())
.fold(
Lists.<WorkerSetupData>newArrayList(),
new Folder3<ArrayList<WorkerSetupData>, Map<String, Object>>()
{
@Override
public ArrayList<WorkerSetupData> fold(
ArrayList<WorkerSetupData> workerNodeConfigurations,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
// stringObjectMap lowercases and jackson may fail serde
workerNodeConfigurations.add(
jsonMapper.readValue(
MapUtils.getString(stringObjectMap, "payload"),
WorkerSetupData.class
)
);
return workerNodeConfigurations;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
if (setupDataList.isEmpty()) {
throw new ISE("WTF?! No configuration found for worker nodes!");
} else if (setupDataList.size() != 1) {
throw new ISE("WTF?! Found more than one configuration for worker nodes");
}
workerSetupData.set(setupDataList.get(0));
}
catch (Exception e) {
log.error(e, "Exception while polling for worker setup data!");
}
}
@SuppressWarnings("unchecked")
public WorkerSetupData getWorkerSetupData()
{
synchronized (lock) {
if (!started) {
throw new ISE("Must start WorkerSetupManager first!");
}
return workerSetupData.get();
}
}
public boolean setWorkerSetupData(final WorkerSetupData value)
{
synchronized (lock) {
try {
if (!started) {
throw new ISE("Must start WorkerSetupManager first!");
}
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
config.getConfigTable()
)
)
.bind("name", config.getWorkerSetupConfigName())
.bind("payload", jsonMapper.writeValueAsString(value))
.execute();
return null;
}
}
);
workerSetupData.set(value);
}
catch (Exception e) {
log.error(e, "Exception updating worker config");
return false;
}
}
return true;
}
}

View File

@ -17,6 +17,8 @@ import com.metamx.druid.merger.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.merger.coordinator.config.RetryPolicyConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStrategy;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.druid.merger.worker.TaskMonitor;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
@ -62,6 +64,7 @@ public class RemoteTaskRunnerTest
private PathChildrenCache pathChildrenCache;
private RemoteTaskRunner remoteTaskRunner;
private TaskMonitor taskMonitor;
private WorkerSetupManager workerSetupManager;
private ScheduledExecutorService scheduledExec;
@ -69,7 +72,6 @@ public class RemoteTaskRunnerTest
private Worker worker1;
@Before
public void setUp() throws Exception
{
@ -141,9 +143,10 @@ public class RemoteTaskRunnerTest
{
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
try {
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
fail("ISE expected");
} catch (ISE expected) {
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
fail("ISE expected");
}
catch (ISE expected) {
}
}
@ -333,6 +336,17 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner() throws Exception
{
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
new WorkerSetupData(
"0",
0,
null,
null
)
);
EasyMock.replay(workerSetupManager);
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
@ -341,7 +355,8 @@ public class RemoteTaskRunnerTest
pathChildrenCache,
scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()),
new TestScalingStrategy()
new TestScalingStrategy(),
workerSetupManager
);
// Create a single worker and wait for things for be ready
@ -389,6 +404,12 @@ public class RemoteTaskRunnerTest
{
return null;
}
@Override
public List<String> ipLookup(List<String> ips)
{
return ips;
}
}
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
@ -405,18 +426,6 @@ public class RemoteTaskRunnerTest
return null;
}
@Override
public String getMinWorkerVersion()
{
return "0";
}
@Override
public int getMinNumWorkers()
{
return 0;
}
@Override
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
{

View File

@ -27,8 +27,13 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.merger.coordinator.setup.EC2NodeData;
import com.metamx.druid.merger.coordinator.setup.GalaxyUserData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
@ -52,6 +57,7 @@ public class EC2AutoScalingStrategyTest
private Reservation reservation;
private Instance instance;
private EC2AutoScalingStrategy strategy;
private WorkerSetupManager workerSetupManager;
@Before
public void setUp() throws Exception
@ -60,6 +66,7 @@ public class EC2AutoScalingStrategyTest
runInstancesResult = EasyMock.createMock(RunInstancesResult.class);
describeInstancesResult = EasyMock.createMock(DescribeInstancesResult.class);
reservation = EasyMock.createMock(Reservation.class);
workerSetupManager = EasyMock.createMock(WorkerSetupManager.class);
instance = new Instance()
.withInstanceId(INSTANCE_ID)
@ -69,44 +76,16 @@ public class EC2AutoScalingStrategyTest
strategy = new EC2AutoScalingStrategy(
new DefaultObjectMapper(),
amazonEC2Client, new EC2AutoScalingStrategyConfig()
{
@Override
public String getAmiId()
{
return AMI_ID;
}
@Override
public String getWorkerPort()
{
return "8080";
}
@Override
public String getInstanceType()
{
return "t1.micro";
}
@Override
public int getMinNumInstancesToProvision()
{
return 1;
}
@Override
public int getMaxNumInstancesToProvision()
{
return 1;
}
@Override
public String getUserDataFile()
{
return "";
}
}
amazonEC2Client,
new EC2AutoScalingStrategyConfig()
{
@Override
public String getWorkerPort()
{
return "8080";
}
},
workerSetupManager
);
}
@ -117,11 +96,22 @@ public class EC2AutoScalingStrategyTest
EasyMock.verify(runInstancesResult);
EasyMock.verify(describeInstancesResult);
EasyMock.verify(reservation);
EasyMock.verify(workerSetupManager);
}
@Test
public void testScale()
{
EasyMock.expect(workerSetupManager.getWorkerSetupData()).andReturn(
new WorkerSetupData(
"0",
0,
new EC2NodeData(AMI_ID, INSTANCE_ID, 1, 1, Lists.<String>newArrayList(), "foo"),
new GalaxyUserData("env", "version", "type")
)
);
EasyMock.replay(workerSetupManager);
EasyMock.expect(amazonEC2Client.runInstances(EasyMock.anyObject(RunInstancesRequest.class))).andReturn(
runInstancesResult
);
@ -144,9 +134,9 @@ public class EC2AutoScalingStrategyTest
Assert.assertEquals(created.getNodeIds().size(), 1);
Assert.assertEquals(created.getNodes().size(), 1);
Assert.assertEquals(String.format("%s:8080", IP), created.getNodeIds().get(0));
Assert.assertEquals("theInstance", created.getNodeIds().get(0));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyHost"));
AutoScalingData deleted = strategy.terminate(Arrays.asList("dummyIP"));
Assert.assertEquals(deleted.getNodeIds().size(), 1);
Assert.assertEquals(deleted.getNodes().size(), 1);

View File

@ -68,7 +68,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
<version>0.18.0</version>
<version>0.19.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -84,7 +84,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
<version>1.7</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>

View File

@ -308,30 +308,21 @@ public class RealtimePlumberSchool implements PlumberSchool
final File mergedFile;
try {
List<MMappedIndex> indexes = Lists.newArrayList();
List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
if (queryableIndex instanceof MMappedIndexQueryableIndex) {
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(((MMappedIndexQueryableIndex) queryableIndex).getIndex());
}
else {
log.makeAlert("[%s] Failure to merge-n-push", schema.getDataSource())
.addData("type", "Unknown segment type")
.addData("adapterClass", segment.getClass().toString())
.emit();
return;
}
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeMMapped(
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
new File(computePersistDir(schema, interval), "merged")
);
MMappedIndex index = IndexIO.mapDir(mergedFile);
QueryableIndex index = IndexIO.loadIndex(mergedFile);
DataSegment segment = segmentPusher.push(
mergedFile,
@ -503,9 +494,7 @@ public class RealtimePlumberSchool implements PlumberSchool
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
indexToPersist.swapSegment(
new QueryableIndexSegment(null, new MMappedIndexQueryableIndex(IndexIO.mapDir(persistedFile)))
);
indexToPersist.swapSegment(new QueryableIndexSegment(null, IndexIO.loadIndex(persistedFile)));
return numRows;
}

View File

@ -19,7 +19,6 @@
package com.metamx.druid.index.v1;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.kv.FlattenedArrayWriter;
@ -27,7 +26,6 @@ import com.metamx.druid.kv.IOPeon;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
/**
*/
@ -75,18 +73,12 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
{
writer.close();
final File littleEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.LITTLE_ENDIAN);
littleEndianFile.delete();
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
outFile.delete();
MetricHolder.writeComplexMetric(
Files.newOutputStreamSupplier(littleEndianFile, true), metricName, serde.getTypeName(), writer
);
IndexIO.checkFileSize(littleEndianFile);
final File bigEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.BIG_ENDIAN);
ByteStreams.copy(
Files.newInputStreamSupplier(littleEndianFile),
Files.newOutputStreamSupplier(bigEndianFile, false)
Files.newOutputStreamSupplier(outFile, true), metricName, serde.getTypeName(), writer
);
IndexIO.checkFileSize(outFile);
writer = null;
}

View File

@ -24,7 +24,6 @@ import com.metamx.druid.kv.IOPeon;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
/**
*/
@ -34,8 +33,7 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
private final IOPeon ioPeon;
private final File outDir;
private CompressedFloatsSupplierSerializer littleMetricsWriter;
private CompressedFloatsSupplierSerializer bigEndianMetricsWriter;
private CompressedFloatsSupplierSerializer writer;
public FloatMetricColumnSerializer(
String metricName,
@ -51,43 +49,30 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
@Override
public void open() throws IOException
{
littleMetricsWriter = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), ByteOrder.LITTLE_ENDIAN
);
bigEndianMetricsWriter = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_big", metricName), ByteOrder.BIG_ENDIAN
writer = CompressedFloatsSupplierSerializer.create(
ioPeon, String.format("%s_little", metricName), IndexIO.BYTE_ORDER
);
littleMetricsWriter.open();
bigEndianMetricsWriter.open();
writer.open();
}
@Override
public void serialize(Object obj) throws IOException
{
float val = (obj == null) ? 0 : ((Number) obj).floatValue();
littleMetricsWriter.add(val);
bigEndianMetricsWriter.add(val);
writer.add(val);
}
@Override
public void close() throws IOException
{
final File littleEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.LITTLE_ENDIAN);
littleEndianFile.delete();
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
outFile.delete();
MetricHolder.writeFloatMetric(
Files.newOutputStreamSupplier(littleEndianFile, true), metricName, littleMetricsWriter
Files.newOutputStreamSupplier(outFile, true), metricName, writer
);
IndexIO.checkFileSize(littleEndianFile);
IndexIO.checkFileSize(outFile);
final File bigEndianFile = IndexIO.makeMetricFile(outDir, metricName, ByteOrder.BIG_ENDIAN);
bigEndianFile.delete();
MetricHolder.writeFloatMetric(
Files.newOutputStreamSupplier(bigEndianFile, true), metricName, bigEndianMetricsWriter
);
IndexIO.checkFileSize(bigEndianFile);
littleMetricsWriter = null;
bigEndianMetricsWriter = null;
writer = null;
}
}

View File

@ -44,6 +44,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.ToLowerCaseAggregatorFactory;
import com.metamx.druid.guava.FileOutputSupplier;
import com.metamx.druid.guava.GuavaUtils;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
@ -75,8 +76,10 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
@ -139,26 +142,26 @@ public class IndexMerger
);
}
public static File mergeMMapped(
List<MMappedIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
) throws IOException
{
return mergeMMapped(indexes, metricAggs, outDir, new NoopProgressIndicator());
return mergeQueryableIndex(indexes, metricAggs, outDir, new NoopProgressIndicator());
}
public static File mergeMMapped(
List<MMappedIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress
) throws IOException
{
return merge(
Lists.transform(
indexes,
new Function<MMappedIndex, IndexableAdapter>()
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(@Nullable final MMappedIndex input)
public IndexableAdapter apply(final QueryableIndex input)
{
return new MMappedIndexAdapter(input);
return new QueryableIndexIndexableAdapter(input);
}
}
),
@ -384,7 +387,6 @@ public class IndexMerger
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
) throws IOException
{
// TODO: make v9 index, complain to Eric when you see this, cause he should be doing it.
Map<String, String> metricTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
for (IndexableAdapter adapter : indexes) {
for (String metric : adapter.getAvailableMetrics()) {
@ -392,11 +394,13 @@ public class IndexMerger
}
}
final Interval dataInterval;
File v8OutDir = new File(outDir, "v8-tmp");
v8OutDir.mkdirs();
/************* Main index.drd file **************/
progress.progress();
long startTime = System.currentTimeMillis();
File indexFile = new File(outDir, "index.drd");
File indexFile = new File(v8OutDir, "index.drd");
FileOutputStream fileOutputStream = null;
FileChannel channel = null;
@ -426,7 +430,7 @@ public class IndexMerger
fileOutputStream = null;
}
IndexIO.checkFileSize(indexFile);
log.info("outDir[%s] completed index.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Setup Dim Conversions **************/
progress.progress();
@ -499,7 +503,7 @@ public class IndexMerger
}
dimensionCardinalities.put(dimension, count);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(outDir, dimension), true);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true);
dimOuts.add(dimOut);
writer.close();
@ -514,7 +518,7 @@ public class IndexMerger
ioPeon.cleanup();
}
log.info("outDir[%s] completed dim conversions in %,d millis.", outDir, System.currentTimeMillis() - startTime);
log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
/************* Walk through data sets and merge them *************/
progress.progress();
@ -573,15 +577,11 @@ public class IndexMerger
Iterable<Rowboat> theRows = rowMergerFn.apply(boats);
CompressedLongsSupplierSerializer littleEndianTimeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", ByteOrder.LITTLE_ENDIAN
);
CompressedLongsSupplierSerializer bigEndianTimeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "big_end_time", ByteOrder.BIG_ENDIAN
CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create(
ioPeon, "little_end_time", IndexIO.BYTE_ORDER
);
littleEndianTimeWriter.open();
bigEndianTimeWriter.open();
timeWriter.open();
ArrayList<VSizeIndexedWriter> forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size());
for (String dimension : mergedDimensions) {
@ -595,7 +595,7 @@ public class IndexMerger
String metric = entry.getKey();
String typeName = entry.getValue();
if ("float".equals(typeName)) {
metWriters.add(new FloatMetricColumnSerializer(metric, outDir, ioPeon));
metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon));
} else {
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
@ -603,7 +603,7 @@ public class IndexMerger
throw new ISE("Unknown type[%s]", typeName);
}
metWriters.add(new ComplexMetricColumnSerializer(metric, outDir, ioPeon, serde));
metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde));
}
}
for (MetricColumnSerializer metWriter : metWriters) {
@ -621,8 +621,7 @@ public class IndexMerger
for (Rowboat theRow : theRows) {
progress.progress();
littleEndianTimeWriter.add(theRow.getTimestamp());
bigEndianTimeWriter.add(theRow.getTimestamp());
timeWriter.add(theRow.getTimestamp());
final Object[] metrics = theRow.getMetrics();
for (int i = 0; i < metrics.length; ++i) {
@ -650,7 +649,7 @@ public class IndexMerger
if ((++rowCount % 500000) == 0) {
log.info(
"outDir[%s] walked 500,000/%,d rows in %,d millis.", outDir, rowCount, System.currentTimeMillis() - time
"outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time
);
time = System.currentTimeMillis();
}
@ -660,17 +659,11 @@ public class IndexMerger
rowNumConversion.rewind();
}
final File littleEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.LITTLE_ENDIAN);
littleEndianFile.delete();
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(littleEndianFile, true);
littleEndianTimeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(littleEndianFile);
final File bigEndianFile = IndexIO.makeTimeFile(outDir, ByteOrder.BIG_ENDIAN);
bigEndianFile.delete();
out = Files.newOutputStreamSupplier(bigEndianFile, true);
bigEndianTimeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(bigEndianFile);
final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER);
timeFile.delete();
OutputSupplier<FileOutputStream> out = Files.newOutputStreamSupplier(timeFile, true);
timeWriter.closeAndConsolidate(out);
IndexIO.checkFileSize(timeFile);
for (int i = 0; i < mergedDimensions.size(); ++i) {
forwardDimWriters.get(i).close();
@ -684,7 +677,7 @@ public class IndexMerger
ioPeon.cleanup();
log.info(
"outDir[%s] completed walk through of %,d rows in %,d millis.",
outDir,
v8OutDir,
rowCount,
System.currentTimeMillis() - startTime
);
@ -692,7 +685,7 @@ public class IndexMerger
/************ Create Inverted Indexes *************/
startTime = System.currentTimeMillis();
final File invertedFile = new File(outDir, "inverted.drd");
final File invertedFile = new File(v8OutDir, "inverted.drd");
Files.touch(invertedFile);
out = Files.newOutputStreamSupplier(invertedFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
@ -725,10 +718,7 @@ public class IndexMerger
}
ConciseSet bitset = new ConciseSet();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
for (Integer row : CombiningIterable.createSplatted(convertedInverteds, Ordering.<Integer>natural().nullsFirst())) {
if (row != INVALID_ROW) {
bitset.add(row);
}
@ -744,33 +734,34 @@ public class IndexMerger
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
}
log.info("outDir[%s] completed inverted.drd in %,d millis.", outDir, System.currentTimeMillis() - startTime);
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
final ArrayList<String> expectedFiles = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
"index.drd", "inverted.drd", "time_BIG_ENDIAN.drd", "time_LITTLE_ENDIAN.drd"
"index.drd", "inverted.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER)
),
Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")),
Iterables.transform(mergedMetrics, GuavaUtils.formatFunction("met_%s_LITTLE_ENDIAN.drd")),
Iterables.transform(mergedMetrics, GuavaUtils.formatFunction("met_%s_BIG_ENDIAN.drd"))
Iterables.transform(
mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER))
)
)
);
Map<String, File> files = Maps.newLinkedHashMap();
for (String fileName : expectedFiles) {
files.put(fileName, new File(outDir, fileName));
files.put(fileName, new File(v8OutDir, fileName));
}
File smooshDir = new File(outDir, "smoosher");
File smooshDir = new File(v8OutDir, "smoosher");
smooshDir.mkdir();
for (Map.Entry<String, File> entry : Smoosh.smoosh(outDir, smooshDir, files).entrySet()) {
for (Map.Entry<String, File> entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) {
entry.getValue().delete();
}
for (File file : smooshDir.listFiles()) {
Files.move(file, new File(outDir, file.getName()));
Files.move(file, new File(v8OutDir, file.getName()));
}
if (!smooshDir.delete()) {
@ -780,18 +771,21 @@ public class IndexMerger
createIndexDrdFile(
IndexIO.CURRENT_VERSION_ID,
outDir,
v8OutDir,
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
dataInterval
);
IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir);
FileUtils.deleteDirectory(v8OutDir);
return outDir;
}
private static <T extends Comparable> ArrayList<T> mergeIndexed(final List<Iterable<T>> indexedLists)
{
TreeSet<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst());
Set<T> retVal = Sets.newTreeSet(Ordering.<T>natural().nullsFirst());
for (Iterable<T> indexedList : indexedLists) {
for (T val : indexedList) {

View File

@ -53,6 +53,12 @@ public class MMappedIndexQueryableIndex implements QueryableIndex
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return index.getTimestamps().size();
}
@Override
public Indexed<String> getColumnNames()
{
@ -91,7 +97,7 @@ public class MMappedIndexQueryableIndex implements QueryableIndex
return new FloatColumn(metricHolder.floatType);
}
else {
return new ComplexColumnImpl(metricHolder.getComplexType());
return new ComplexColumnImpl(metricHolder.getTypeName(), metricHolder.getComplexType());
}
}
}

View File

@ -0,0 +1,290 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
import com.metamx.druid.index.column.GenericColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.kv.ArrayBasedIndexedInts;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.EmptyIndexedInts;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.kv.ListIndexed;
import org.joda.time.Interval;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
/**
*/
public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
private final int numRows;
private final QueryableIndex input;
public QueryableIndexIndexableAdapter(QueryableIndex input)
{
this.input = input;
numRows = input.getNumRows();
}
@Override
public Interval getDataInterval()
{
return input.getDataInterval();
}
@Override
public int getNumRows()
{
return numRows;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return input.getAvailableDimensions();
}
@Override
public Indexed<String> getAvailableMetrics()
{
final Set<String> columns = Sets.newLinkedHashSet(input.getColumnNames());
final HashSet<String> dimensions = Sets.newHashSet(getAvailableDimensions());
return new ListIndexed<String>(
Lists.newArrayList(Sets.difference(columns, dimensions)),
String.class
);
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
final Column column = input.getColumn(dimension);
if (column == null) {
return null;
}
final DictionaryEncodedColumn dict = column.getDictionaryEncoding();
if (dict == null) {
return null;
}
return new Indexed<String>()
{
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return dict.getCardinality();
}
@Override
public String get(int index)
{
return dict.lookupName(index);
}
@Override
public int indexOf(String value)
{
return dict.lookupId(value);
}
@Override
public Iterator<String> iterator()
{
return IndexedIterable.create(this).iterator();
}
};
}
@Override
public Iterable<Rowboat> getRows()
{
return new Iterable<Rowboat>()
{
@Override
public Iterator<Rowboat> iterator()
{
return new Iterator<Rowboat>()
{
final GenericColumn timestamps = input.getTimeColumn().getGenericColumn();
final Object[] metrics;
final Map<String, DictionaryEncodedColumn> dimensions;
final int numMetrics = getAvailableMetrics().size();
int currRow = 0;
boolean done = false;
{
dimensions = Maps.newLinkedHashMap();
for (String dim : input.getAvailableDimensions()) {
dimensions.put(dim, input.getColumn(dim).getDictionaryEncoding());
}
final Indexed<String> availableMetrics = getAvailableMetrics();
metrics = new Object[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) {
final Column column = input.getColumn(availableMetrics.get(i));
final ValueType type = column.getCapabilities().getType();
switch (type) {
case FLOAT:
metrics[i] = column.getGenericColumn();
break;
case COMPLEX:
metrics[i] = column.getComplexColumn();
break;
default:
throw new ISE("Cannot handle type[%s]", type);
}
}
}
@Override
public boolean hasNext()
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
for (Object metric : metrics) {
if (metric instanceof Closeable) {
Closeables.closeQuietly((Closeable) metric);
}
}
done = true;
}
return hasNext;
}
@Override
public Rowboat next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
int[][] dims = new int[dimensions.size()][];
int dimIndex = 0;
for (String dim : dimensions.keySet()) {
final DictionaryEncodedColumn dict = dimensions.get(dim);
final IndexedInts dimVals;
if (dict.hasMultipleValues()) {
dimVals = dict.getMultiValueRow(currRow);
}
else {
dimVals = new ArrayBasedIndexedInts(new int[]{dict.getSingleValueRow(currRow)});
}
int[] theVals = new int[dimVals.size()];
for (int j = 0; j < theVals.length; ++j) {
theVals[j] = dimVals.get(j);
}
dims[dimIndex++] = theVals;
}
Object[] metricArray = new Object[numMetrics];
for (int i = 0; i < metricArray.length; ++i) {
if (metrics[i] instanceof GenericColumn) {
metricArray[i] = ((GenericColumn) metrics[i]).getFloatSingleValueRow(currRow);
}
else if (metrics[i] instanceof ComplexColumn) {
metricArray[i] = ((ComplexColumn) metrics[i]).getRowValue(currRow);
}
}
final Rowboat retVal = new Rowboat(
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow
);
++currRow;
return retVal;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public IndexedInts getInverteds(String dimension, String value)
{
final Column column = input.getColumn(dimension);
if (column == null) {
return new EmptyIndexedInts();
}
final BitmapIndex bitmaps = column.getBitmapIndex();
if (bitmaps == null) {
return new EmptyIndexedInts();
}
return new ConciseCompressedIndexedInts(bitmaps.getConciseSet(value));
}
@Override
public String getMetricType(String metric)
{
final Column column = input.getColumn(metric);
final ValueType type = column.getCapabilities().getType();
switch (type) {
case FLOAT:
return "float";
case COMPLEX:
return column.getComplexColumn().getTypeName();
default:
throw new ISE("Unknown type[%s]", type);
}
}
}

View File

@ -277,7 +277,10 @@ public class GroupByQueryEngine
dimNames = new String[dimensionSpecs.size()];
for (int i = 0; i < dimensionSpecs.size(); ++i) {
final DimensionSpec dimSpec = dimensionSpecs.get(i);
dimensions.add(cursor.makeDimensionSelector(dimSpec.getDimension()));
final DimensionSelector selector = cursor.makeDimensionSelector(dimSpec.getDimension());
if (selector != null) {
dimensions.add(selector);
}
dimNames[i] = dimSpec.getOutputName();
}

View File

@ -23,12 +23,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.QueryableIndex;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
public class EmptyIndexTest
{
@ -48,11 +48,11 @@ public class EmptyIndexTest
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex);
IndexMerger.merge(Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir);
MMappedIndex emptyIndexMMapped = IndexIO.mapDir(tmpDir);
QueryableIndex emptyQueryableIndex = IndexIO.loadIndex(tmpDir);
Assert.assertEquals("getAvailableDimensions", 0, Iterables.size(emptyIndexMMapped.getAvailableDimensions()));
Assert.assertEquals("getAvailableMetrics", 0, Iterables.size(emptyIndexMMapped.getAvailableMetrics()));
Assert.assertEquals("getDataInterval", new Interval("2012-08-01/P3D"), emptyIndexMMapped.getDataInterval());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyIndexMMapped.getReadOnlyTimestamps().size());
Assert.assertEquals("getAvailableDimensions", 0, Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
Assert.assertEquals("getAvailableMetrics", 0, Iterables.size(emptyQueryableIndex.getColumnNames()));
Assert.assertEquals("getDataInterval", new Interval("2012-08-01/P3D"), emptyQueryableIndex.getDataInterval());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyQueryableIndex.getTimeColumn().getLength());
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.input.MapBasedInputRow;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
@ -44,11 +45,11 @@ public class IndexMergerTest
final File tempDir = Files.createTempDir();
try {
MMappedIndex index = IndexIO.mapDir(IndexMerger.persist(toPersist, tempDir));
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir));
Assert.assertEquals(2, index.getTimestamps().size());
Assert.assertEquals(2, index.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(0, index.getAvailableMetrics().size());
Assert.assertEquals(2, index.getColumnNames().size());
}
finally {
tempDir.delete();
@ -84,25 +85,25 @@ public class IndexMergerTest
final File tempDir2 = Files.createTempDir();
final File mergedDir = Files.createTempDir();
try {
MMappedIndex index1 = IndexIO.mapDir(IndexMerger.persist(toPersist1, tempDir1));
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1));
Assert.assertEquals(2, index1.getTimestamps().size());
Assert.assertEquals(2, index1.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(0, index1.getAvailableMetrics().size());
Assert.assertEquals(2, index1.getColumnNames().size());
MMappedIndex index2 = IndexIO.mapDir(IndexMerger.persist(toPersist2, tempDir2));
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getTimestamps().size());
Assert.assertEquals(2, index2.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(0, index2.getAvailableMetrics().size());
Assert.assertEquals(2, index2.getColumnNames().size());
MMappedIndex merged = IndexIO.mapDir(
IndexMerger.mergeMMapped(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir)
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir)
);
Assert.assertEquals(3, merged.getTimestamps().size());
Assert.assertEquals(3, merged.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions()));
Assert.assertEquals(0, merged.getAvailableMetrics().size());
Assert.assertEquals(2, merged.getColumnNames().size());
}
finally {
FileUtils.deleteQuietly(tempDir1);

View File

@ -119,8 +119,8 @@ public class TestIndex
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile);
mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeMMapped(
Arrays.asList(IndexIO.mapDir(topFile), IndexIO.mapDir(bottomFile)),
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)),
METRIC_AGGS,
mergedFile
)