add support for dimension compression

- compression for single-value dimensions using CompressedVSizeIntsIndexedSupplier
- makes dimension compression configurable via IndexSpec
- IndexSpec also enables configuring bitmap and metric compression
This commit is contained in:
Xavier Léauté 2015-04-13 23:38:08 -07:00
parent bafc5114b4
commit 3a3046ccf3
39 changed files with 1154 additions and 446 deletions

View File

@ -17,41 +17,6 @@
package io.druid.indexer;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
@ -71,6 +36,40 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
/**
*/
@ -247,6 +246,10 @@ public class HadoopDruidIndexerConfig
return schema.getTuningConfig().getPartitionsSpec();
}
public IndexSpec getIndexSpec() {
return schema.getTuningConfig().getIndexSpec();
}
public boolean isOverwriteFiles()
{
return schema.getTuningConfig().isOverwriteFiles();

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.IndexSpec;
import io.druid.segment.data.BitmapSerde;
import io.druid.segment.indexing.TuningConfig;
import org.joda.time.DateTime;
@ -36,6 +38,7 @@ public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
@ -47,6 +50,7 @@ public class HadoopTuningConfig implements TuningConfig
new DateTime().toString(),
DEFAULT_PARTITIONS_SPEC,
DEFAULT_SHARD_SPECS,
DEFAULT_INDEX_SPEC,
DEFAULT_ROW_FLUSH_BOUNDARY,
false,
true,
@ -65,6 +69,7 @@ public class HadoopTuningConfig implements TuningConfig
private final String version;
private final PartitionsSpec partitionsSpec;
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final int rowFlushBoundary;
private final boolean leaveIntermediate;
private final Boolean cleanupOnFailure;
@ -83,6 +88,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") IndexSpec indexSpec,
final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
@ -100,6 +106,7 @@ public class HadoopTuningConfig implements TuningConfig
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.rowFlushBoundary = maxRowsInMemory == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemory;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
@ -139,6 +146,12 @@ public class HadoopTuningConfig implements TuningConfig
return shardSpecs;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
public int getRowFlushBoundary()
{
@ -210,6 +223,7 @@ public class HadoopTuningConfig implements TuningConfig
version,
partitionsSpec,
shardSpecs,
indexSpec,
rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,
@ -231,6 +245,7 @@ public class HadoopTuningConfig implements TuningConfig
ver,
partitionsSpec,
shardSpecs,
indexSpec,
rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,
@ -252,6 +267,7 @@ public class HadoopTuningConfig implements TuningConfig
version,
partitionsSpec,
specs,
indexSpec,
rowFlushBoundary,
leaveIntermediate,
cleanupOnFailure,

View File

@ -17,61 +17,6 @@
package io.druid.indexer;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
@ -89,6 +34,58 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexSpec;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
/**
*/
@ -294,7 +291,7 @@ public class IndexGeneratorJob implements Jobby
public static class IndexGeneratorReducer extends Reducer<BytesWritable, Writable, BytesWritable, Text>
{
private HadoopDruidIndexerConfig config;
protected HadoopDruidIndexerConfig config;
private List<String> metricNames = Lists.newArrayList();
private InputRowParser parser;
@ -318,7 +315,7 @@ public class IndexGeneratorJob implements Jobby
) throws IOException
{
return IndexMaker.persist(
index, interval, file, progressIndicator
index, interval, file, config.getIndexSpec(), progressIndicator
);
}
@ -330,7 +327,7 @@ public class IndexGeneratorJob implements Jobby
) throws IOException
{
return IndexMaker.mergeQueryableIndex(
indexes, aggs, file, progressIndicator
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
}

View File

@ -20,6 +20,7 @@ package io.druid.indexer;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.incremental.IncrementalIndex;
@ -67,7 +68,7 @@ public class LegacyIndexGeneratorJob extends IndexGeneratorJob
IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.persist(index, interval, file, progressIndicator);
return IndexMerger.persist(index, interval, file, config.getIndexSpec(), progressIndicator);
}
@Override
@ -78,7 +79,7 @@ public class LegacyIndexGeneratorJob extends IndexGeneratorJob
ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.mergeQueryableIndex(indexes, aggs, file, progressIndicator);
return IndexMerger.mergeQueryableIndex(indexes, aggs, file, config.getIndexSpec(), progressIndicator);
}
}
}

View File

@ -173,6 +173,7 @@ public class HadoopDruidIndexerConfigTest
null,
ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
null,
null,
false,
false,
false,

View File

@ -158,7 +158,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
}
// Map merged segment so we can extract dimensions
@ -205,7 +205,8 @@ public class YeOldePlumberSchool implements PlumberSchool
try {
IndexMaker.persist(
indexToPersist.getIndex(),
dirToPersist
dirToPersist,
config.getIndexSpec()
);
indexToPersist.swapSegment(null);

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
@ -46,14 +47,19 @@ import java.util.Map;
*/
public class AppendTask extends MergeTaskBase
{
private final IndexSpec indexSpec;
@JsonCreator
public AppendTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("indexSpec") IndexSpec indexSpec
)
{
super(id, dataSource, segments);
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}
@Override
@ -120,7 +126,7 @@ public class AppendTask extends MergeTaskBase
);
}
return IndexMerger.append(adapters, outDir);
return IndexMerger.append(adapters, outDir, indexSpec);
}
@Override

View File

@ -45,6 +45,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSpec;
@ -336,7 +337,20 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir
).findPlumber(
schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null, null),
new RealtimeTuningConfig(
null,
null,
null,
null,
null,
null,
null,
shardSpec,
ingestionSchema.getTuningConfig().getIndexSpec(),
null,
null,
null
),
metrics
);
@ -416,7 +430,7 @@ public class IndexTask extends AbstractFixedIntervalTask
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null) : tuningConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null, null) : tuningConfig;
}
@Override
@ -466,21 +480,25 @@ public class IndexTask extends AbstractFixedIntervalTask
{
private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 500000;
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private final int targetPartitionSize;
private final int rowFlushBoundary;
private final int numShards;
private final IndexSpec indexSpec;
@JsonCreator
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec
)
{
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
this.numShards = numShards == null ? -1 : numShards;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
Preconditions.checkArgument(
this.targetPartitionSize == -1 || this.numShards == -1,
"targetPartitionsSize and shardCount both cannot be set"
@ -504,5 +522,11 @@ public class IndexTask extends AbstractFixedIntervalTask
{
return numShards;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.timeline.DataSegment;
@ -41,17 +42,20 @@ public class MergeTask extends MergeTaskBase
{
@JsonIgnore
private final List<AggregatorFactory> aggregators;
private final IndexSpec indexSpec;
@JsonCreator
public MergeTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("indexSpec") IndexSpec indexSpec
)
{
super(id, dataSource, segments);
this.aggregators = aggregators;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}
@Override
@ -76,7 +80,8 @@ public class MergeTask extends MergeTaskBase
}
),
aggregators.toArray(new AggregatorFactory[aggregators.size()]),
outDir
outDir,
indexSpec
);
}

View File

@ -31,6 +31,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -53,11 +54,12 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
@JsonIgnore
private final DataSegment segment;
private final IndexSpec indexSpec;
public static VersionConverterTask create(String dataSource, Interval interval)
{
final String id = makeId(dataSource, interval);
return new VersionConverterTask(id, id, dataSource, interval, null);
return new VersionConverterTask(id, id, dataSource, interval, null, null);
}
public static VersionConverterTask create(DataSegment segment)
@ -65,7 +67,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
final Interval interval = segment.getInterval();
final String dataSource = segment.getDataSource();
final String id = makeId(dataSource, interval);
return new VersionConverterTask(id, id, dataSource, interval, segment);
return new VersionConverterTask(id, id, dataSource, interval, segment, null);
}
private static String makeId(String dataSource, Interval interval)
@ -81,7 +83,8 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("segment") DataSegment segment
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec
)
{
if (id == null) {
@ -91,7 +94,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
return create(segment);
}
}
return new VersionConverterTask(id, groupId, dataSource, interval, segment);
return new VersionConverterTask(id, groupId, dataSource, interval, segment, indexSpec);
}
private VersionConverterTask(
@ -99,11 +102,13 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
String groupId,
String dataSource,
Interval interval,
DataSegment segment
DataSegment segment,
IndexSpec indexSpec
)
{
super(id, groupId, dataSource, interval);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}
@Override
@ -138,7 +143,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
{
final Integer segmentVersion = segment.getBinaryVersion();
if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
return new SubTask(getGroupId(), segment);
return new SubTask(getGroupId(), segment, indexSpec);
}
log.info("Skipping[%s], already version[%s]", segment.getIdentifier(), segmentVersion);
@ -156,7 +161,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
}
} else {
log.info("I'm in a subless mood.");
convertSegment(toolbox, segment);
convertSegment(toolbox, segment, indexSpec);
}
return success();
}
@ -184,11 +189,13 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
{
@JsonIgnore
private final DataSegment segment;
private final IndexSpec indexSpec;
@JsonCreator
public SubTask(
@JsonProperty("groupId") String groupId,
@JsonProperty("segment") DataSegment segment
@JsonProperty("segment") DataSegment segment,
@JsonProperty("indexSpec") IndexSpec indexSpec
)
{
super(
@ -204,6 +211,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
segment.getInterval()
);
this.segment = segment;
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
}
@JsonProperty
@ -222,12 +230,12 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
log.info("Subs are good! Italian BMT and Meatball are probably my favorite.");
convertSegment(toolbox, segment);
convertSegment(toolbox, segment, indexSpec);
return success();
}
}
private static void convertSegment(TaskToolbox toolbox, final DataSegment segment)
private static void convertSegment(TaskToolbox toolbox, final DataSegment segment, IndexSpec indexSpec)
throws SegmentLoadingException, IOException
{
log.info("Converting segment[%s]", segment);
@ -250,7 +258,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
final File location = localSegments.get(segment);
final File outLocation = new File(location, "v9_out");
if (IndexIO.convertSegment(location, outLocation)) {
if (IndexIO.convertSegment(location, outLocation, indexSpec)) {
final int outVersion = IndexIO.getVersionFromDir(outLocation);
// Appending to the version makes a new version that inherits most comparability parameters of the original

View File

@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.google.inject.Provider;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;
import io.druid.indexing.common.task.MergeTask;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -52,7 +53,8 @@ public class TestMergeTask extends MergeTask
0
)
),
Lists.<AggregatorFactory>newArrayList()
Lists.<AggregatorFactory>newArrayList(),
new IndexSpec()
);
}
@ -63,10 +65,11 @@ public class TestMergeTask extends MergeTask
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("indexSpec") IndexSpec indexSpec
)
{
super(id, dataSource, segments, aggregators);
super(id, dataSource, segments, aggregators, indexSpec);
this.id = id;
}

View File

@ -35,6 +35,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -54,6 +55,8 @@ import java.util.List;
public class IndexTaskTest
{
private final IndexSpec indexSpec = new IndexSpec();
@Test
public void testDeterminePartitions() throws Exception
{
@ -108,7 +111,8 @@ public class IndexTaskTest
new IndexTask.IndexTuningConfig(
2,
0,
null
null,
indexSpec
)
),
new DefaultObjectMapper()

View File

@ -31,6 +31,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -53,6 +54,8 @@ public class TaskSerdeTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final IndexSpec indexSpec = new IndexSpec();
@Test
public void testIndexTaskSerde() throws Exception
{
@ -70,7 +73,7 @@ public class TaskSerdeTest
)
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, -1, -1)
new IndexTask.IndexTuningConfig(10000, -1, -1, indexSpec)
),
jsonMapper
);
@ -107,7 +110,8 @@ public class TaskSerdeTest
),
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("cnt")
)
),
indexSpec
);
final String json = jsonMapper.writeValueAsString(task);
@ -179,7 +183,8 @@ public class TaskSerdeTest
{
final VersionConverterTask.SubTask task = new VersionConverterTask.SubTask(
"myGroupId",
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build(),
indexSpec
);
final String json = jsonMapper.writeValueAsString(task);
@ -229,6 +234,7 @@ public class TaskSerdeTest
null,
1,
new NoneShardSpec(),
indexSpec,
false,
false,
null
@ -277,7 +283,8 @@ public class TaskSerdeTest
"foo",
ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build()
)
),
indexSpec
);
final String json = jsonMapper.writeValueAsString(task);

View File

@ -61,6 +61,7 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.loading.DataSegmentArchiver;
@ -100,9 +101,11 @@ import java.util.Set;
@RunWith(Parameterized.class)
public class IngestSegmentFirehoseFactoryTest
{
@Parameterized.Parameters(name = "{1}")
public static Collection<Object[]> constructorFeeder() throws IOException
{
final IndexSpec indexSpec = new IndexSpec();
final HeapMemoryTaskStorage ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
@ -132,7 +135,7 @@ public class IngestSegmentFirehoseFactoryTest
if (!persistDir.mkdirs() && !persistDir.exists()) {
throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath()));
}
IndexMerger.persist(index, persistDir);
IndexMerger.persist(index, persistDir, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)

View File

@ -63,6 +63,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver;
@ -109,6 +110,7 @@ public class TaskLifecycleTest
private MockIndexerMetadataStorageCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
private IndexSpec indexSpec;
private static MockIndexerMetadataStorageCoordinator newMockMDC()
{
@ -248,6 +250,7 @@ public class TaskLifecycleTest
"{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\"}",
TaskQueueConfig.class
);
indexSpec = new IndexSpec();
ts = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
{
@ -361,7 +364,7 @@ public class TaskLifecycleTest
IR("2010-01-02T01", "a", "c", 1)
)
)),
new IndexTask.IndexTuningConfig(10000, -1, -1)),
new IndexTask.IndexTuningConfig(10000, -1, -1, indexSpec)),
TestUtils.MAPPER
);
@ -415,7 +418,7 @@ public class TaskLifecycleTest
)
),
new IndexTask.IndexIOConfig(newMockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, -1, -1)
new IndexTask.IndexTuningConfig(10000, -1, -1, indexSpec)
),
TestUtils.MAPPER
);

View File

@ -33,7 +33,8 @@ public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
@Override
public long getMaxZnodeBytes()
{
return 1000;
// make sure this is large enough, otherwise RemoteTaskRunnerTest might fail unexpectedly
return 10 * 1024;
}
@Override

View File

@ -35,6 +35,7 @@ import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.timeline.DataSegment;
import junit.framework.Assert;
import org.easymock.EasyMock;
@ -64,6 +65,8 @@ public class SimpleResourceManagementStrategyTest
{
autoScaler = EasyMock.createMock(AutoScaler.class);
final IndexSpec indexSpec = new IndexSpec();
testTask = new TestMergeTask(
"task1",
"dummyDs",
@ -80,7 +83,8 @@ public class SimpleResourceManagementStrategyTest
0
)
),
Lists.<AggregatorFactory>newArrayList()
Lists.<AggregatorFactory>newArrayList(),
indexSpec
);
simpleResourceManagementConfig = new SimpleResourceManagementConfig()

View File

@ -20,6 +20,7 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -61,8 +62,12 @@ import io.druid.segment.data.BitmapSerde;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
@ -121,10 +126,12 @@ public class IndexIO
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final ObjectMapper mapper;
private static final BitmapSerdeFactory bitmapSerdeFactory;
protected static final ColumnConfig columnConfig;
@Deprecated // specify bitmap type in IndexSpec instead
protected static final BitmapSerdeFactory CONFIGURED_BITMAP_SERDE_FACTORY;
static {
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of(
@ -140,6 +147,7 @@ public class IndexIO
);
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
// this property is deprecated, use IndexSpec instead
JsonConfigProvider.bind(binder, "druid.processing.bitmap", BitmapSerdeFactory.class);
}
}
@ -147,7 +155,7 @@ public class IndexIO
);
mapper = injector.getInstance(ObjectMapper.class);
columnConfig = injector.getInstance(ColumnConfig.class);
bitmapSerdeFactory = injector.getInstance(BitmapSerdeFactory.class);
CONFIGURED_BITMAP_SERDE_FACTORY = injector.getInstance(BitmapSerdeFactory.class);
}
public static QueryableIndex loadIndex(File inDir) throws IOException
@ -186,7 +194,7 @@ public class IndexIO
}
}
public static boolean convertSegment(File toConvert, File converted) throws IOException
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec) throws IOException
{
final int version = SegmentUtils.getVersionFromDir(toConvert);
@ -205,11 +213,12 @@ public class IndexIO
log.info("Old version, re-persisting.");
IndexMerger.append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))),
converted
converted,
indexSpec
);
return true;
case 8:
DefaultIndexIOHandler.convertV8toV9(toConvert, converted);
DefaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
return true;
default:
log.info("Version[%s], skipping.", version);
@ -328,7 +337,7 @@ public class IndexIO
return retVal;
}
public static void convertV8toV9(File v8Dir, File v9Dir) throws IOException
public static void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec) throws IOException
{
log.info("Converting v8[%s] to v9[%s]", v8Dir, v9Dir);
@ -353,6 +362,8 @@ public class IndexIO
Map<String, GenericIndexed<ImmutableBitmap>> bitmapIndexes = Maps.newHashMap();
final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd");
BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
while (invertedBuffer.hasRemaining()) {
final String dimName = serializerUtils.readString(invertedBuffer);
bitmapIndexes.put(
@ -404,7 +415,7 @@ public class IndexIO
continue;
}
VSizeIndexedInts singleValCol = null;
List<Integer> singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);
@ -468,41 +479,61 @@ public class IndexIO
}
final VSizeIndexed finalMultiValCol = multiValCol;
singleValCol = VSizeIndexedInts.fromList(
new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
final VSizeIndexedInts ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
final VSizeIndexedInts ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}
@Override
public int size()
{
return finalMultiValCol.size();
}
};
@Override
public int size()
{
return finalMultiValCol.size();
}
},
dictionary.size()
);
multiValCol = null;
} else {
builder.setHasMultipleValues(true);
}
builder.addSerde(
new DictionaryEncodedColumnPartSerde(
dictionary,
singleValCol,
multiValCol,
bitmapSerdeFactory,
bitmaps,
spatialIndex
)
);
final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompressionStrategy();
final ColumnDescriptor serdeficator = builder.build();
final DictionaryEncodedColumnPartSerde.Builder columnPartBuilder = DictionaryEncodedColumnPartSerde
.builder()
.withDictionary(dictionary)
.withBitmapSerdeFactory(bitmapSerdeFactory)
.withBitmaps(bitmaps)
.withSpatialIndex(spatialIndex)
.withByteOrder(BYTE_ORDER);
if (singleValCol != null) {
if (compressionStrategy != null) {
columnPartBuilder.withSingleValuedColumn(
CompressedVSizeIntsIndexedSupplier.fromList(
singleValCol,
dictionary.size(),
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()),
BYTE_ORDER,
compressionStrategy
)
);
} else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
}
} else {
if(compressionStrategy != null) {
log.info("Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]", dimension);
}
columnPartBuilder.withMultiValuedColumn(multiValCol);
}
final ColumnDescriptor serdeficator = builder
.addSerde(columnPartBuilder.build())
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
@ -663,7 +694,9 @@ public class IndexIO
new DictionaryEncodedColumnSupplier(
index.getDimValueLookup(dimension),
null,
index.getDimColumn(dimension),
Suppliers.<IndexedMultivalue<IndexedInts>>ofInstance(
index.getDimColumn(dimension)
),
columnConfig.columnCacheSizeBytes()
)
)

View File

@ -32,7 +32,6 @@ import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.collections.bitmap.BitmapFactory;
@ -53,7 +52,6 @@ import io.druid.collections.CombiningIterable;
import io.druid.common.utils.JodaUtils;
import io.druid.common.utils.SerializerUtils;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
@ -63,6 +61,7 @@ import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
@ -108,28 +107,15 @@ public class IndexMaker
private static final int INVALID_ROW = -1;
private static final Splitter SPLITTER = Splitter.on(",");
private static final ObjectMapper mapper;
private static final BitmapSerdeFactory bitmapSerdeFactory;
static {
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.processing.bitmap", BitmapSerdeFactory.class);
}
}
)
);
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.<Module>of());
mapper = injector.getInstance(ObjectMapper.class);
bitmapSerdeFactory = injector.getInstance(BitmapSerdeFactory.class);
}
public static File persist(final IncrementalIndex index, File outDir) throws IOException
public static File persist(final IncrementalIndex index, File outDir, final IndexSpec indexSpec) throws IOException
{
return persist(index, index.getInterval(), outDir);
return persist(index, index.getInterval(), outDir, indexSpec);
}
/**
@ -145,16 +131,18 @@ public class IndexMaker
public static File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir
File outDir,
final IndexSpec indexSpec
) throws IOException
{
return persist(index, dataInterval, outDir, new LoggingProgressIndicator(outDir.toString()));
return persist(index, dataInterval, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
}
public static File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
@ -186,26 +174,28 @@ public class IndexMaker
new IncrementalIndexAdapter(
dataInterval,
index,
bitmapSerdeFactory.getBitmapFactory()
indexSpec.getBitmapSerdeFactory().getBitmapFactory()
)
),
index.getMetricAggs(),
outDir,
indexSpec,
progress
);
}
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec
) throws IOException
{
return mergeQueryableIndex(indexes, metricAggs, outDir, new LoggingProgressIndicator(outDir.toString()));
return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
}
public static File mergeQueryableIndex(
List<QueryableIndex> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
@ -223,21 +213,23 @@ public class IndexMaker
),
metricAggs,
outDir,
indexSpec,
progress
);
}
public static File merge(
List<IndexableAdapter> adapters, final AggregatorFactory[] metricAggs, File outDir
List<IndexableAdapter> adapters, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec
) throws IOException
{
return merge(adapters, metricAggs, outDir, new LoggingProgressIndicator(outDir.toString()));
return merge(adapters, metricAggs, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
}
public static File merge(
List<IndexableAdapter> adapters,
final AggregatorFactory[] metricAggs,
File outDir,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
@ -326,21 +318,23 @@ public class IndexMaker
}
};
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn);
}
public static File append(
final List<IndexableAdapter> adapters,
File outDir
) throws IOException
{
return append(adapters, outDir, new LoggingProgressIndicator(outDir.toString()));
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
}
public static File append(
final List<IndexableAdapter> adapters,
final File outDir,
final ProgressIndicator progress
final IndexSpec indexSpec
) throws IOException
{
return append(adapters, outDir, new LoggingProgressIndicator(outDir.toString()), indexSpec);
}
public static File append(
final List<IndexableAdapter> adapters,
final File outDir,
final ProgressIndicator progress,
final IndexSpec indexSpec
) throws IOException
{
FileUtils.deleteDirectory(outDir);
@ -409,7 +403,7 @@ public class IndexMaker
}
};
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn);
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
}
private static File makeIndexFiles(
@ -418,7 +412,8 @@ public class IndexMaker
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
{
progress.start();
@ -501,14 +496,15 @@ public class IndexMaker
theRows,
columnCapabilities,
dimensionValuesLookup,
rowNumConversions
rowNumConversions,
indexSpec
);
progress.progress();
makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount);
makeMetricColumns(v9Smoosher, progress, theRows, mergedMetrics, valueTypes, metricTypeNames, rowCount, indexSpec);
progress.progress();
makeIndexBinary(v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress);
makeIndexBinary(v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec);
v9Smoosher.close();
@ -768,7 +764,8 @@ public class IndexMaker
final Iterable<Rowboat> theRows,
final Map<String, ColumnCapabilitiesImpl> columnCapabilities,
final Map<String, Iterable<String>> dimensionValuesLookup,
final List<IntBuffer> rowNumConversions
final List<IntBuffer> rowNumConversions,
final IndexSpec indexSpec
) throws IOException
{
final String dimSection = "make dimension columns";
@ -790,7 +787,9 @@ public class IndexMaker
dimension,
columnCapabilities,
dimensionValuesLookup,
rowNumConversions
rowNumConversions,
indexSpec.getBitmapSerdeFactory(),
indexSpec.getDimensionCompressionStrategy()
);
dimIndex++;
}
@ -806,7 +805,9 @@ public class IndexMaker
final String dimension,
final Map<String, ColumnCapabilitiesImpl> columnCapabilities,
final Map<String, Iterable<String>> dimensionValuesLookup,
final List<IntBuffer> rowNumConversions
final List<IntBuffer> rowNumConversions,
final BitmapSerdeFactory bitmapSerdeFactory,
final CompressedObjectStrategy.CompressionStrategy compressionStrategy
) throws IOException
{
final String section = String.format("make %s", dimension);
@ -825,8 +826,8 @@ public class IndexMaker
dimBuilder.setHasMultipleValues(hasMultipleValues);
// make dimension columns
VSizeIndexedInts singleValCol = null;
VSizeIndexed multiValCol = null;
List<Integer> singleValCol;
final VSizeIndexed multiValCol;
ColumnDictionaryEntryStore adder = hasMultipleValues
? new MultiValColumnDictionaryEntryStore()
@ -881,6 +882,8 @@ public class IndexMaker
);
final int dictionarySize = dictionary.size();
singleValCol = null;
multiValCol = VSizeIndexed.fromIterable(
FunctionalIterable
.create(vals)
@ -935,6 +938,7 @@ public class IndexMaker
);
} else {
final int dictionarySize = dictionary.size();
singleValCol = null;
multiValCol = VSizeIndexed.fromIterable(
FunctionalIterable
.create(vals)
@ -973,6 +977,7 @@ public class IndexMaker
}
} else {
final int dictionarySize = dictionary.size();
singleValCol = null;
multiValCol = VSizeIndexed.fromIterable(
FunctionalIterable
.create(vals)
@ -1008,50 +1013,62 @@ public class IndexMaker
Iterables.concat(nullList, dimensionValues),
GenericIndexed.stringStrategy
);
singleValCol = VSizeIndexedInts.fromList(
new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
Integer val = vals.get(index);
if (val == null) {
return 0;
}
return val + 1;
}
multiValCol = null;
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
Integer val = vals.get(index);
if (val == null) {
return 0;
}
return val + 1;
}
@Override
public int size()
{
return vals.size();
}
}, dictionary.size()
);
@Override
public int size()
{
return vals.size();
}
};
} else {
singleValCol = VSizeIndexedInts.fromList(
new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
Integer val = vals.get(index);
if (val == null) {
return 0;
}
return val;
}
multiValCol = null;
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
Integer val = vals.get(index);
if (val == null) {
return 0;
}
return val;
}
@Override
public int size()
{
return vals.size();
}
}, dictionary.size()
);
@Override
public int size()
{
return vals.size();
}
};
}
} else {
singleValCol = VSizeIndexedInts.fromList(vals, dictionary.size());
multiValCol = null;
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
return vals.get(index);
}
@Override
public int size()
{
return vals.size();
}
};
}
}
@ -1174,16 +1191,38 @@ public class IndexMaker
log.info("Completed dimension[%s] with cardinality[%,d]. Starting write.", dimension, dictionary.size());
final DictionaryEncodedColumnPartSerde.Builder dimPartBuilder = DictionaryEncodedColumnPartSerde
.builder()
.withDictionary(dictionary)
.withBitmapSerdeFactory(bitmapSerdeFactory)
.withBitmaps(bitmaps)
.withSpatialIndex(spatialIndex)
.withByteOrder(IndexIO.BYTE_ORDER);
if (singleValCol != null) {
if (compressionStrategy != null) {
dimPartBuilder.withSingleValuedColumn(
CompressedVSizeIntsIndexedSupplier.fromList(
singleValCol,
dictionary.size(),
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()),
IndexIO.BYTE_ORDER,
compressionStrategy
)
);
} else {
dimPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
}
} else {
if(compressionStrategy != null) {
log.info("Compression not supported for multi-value dimensions, defaulting to `uncompressed` for dimension[%s]", dimension);
}
dimPartBuilder.withMultiValuedColumn(multiValCol);
}
writeColumn(
v9Smoosher,
new DictionaryEncodedColumnPartSerde(
dictionary,
singleValCol,
multiValCol,
bitmapSerdeFactory,
bitmaps,
spatialIndex
),
dimPartBuilder.build(),
dimBuilder,
dimension
);
@ -1198,7 +1237,8 @@ public class IndexMaker
final List<String> mergedMetrics,
final Map<String, ValueType> valueTypes,
final Map<String, String> metricTypeNames,
final int rowCount
final int rowCount,
final IndexSpec indexSpec
) throws IOException
{
final String metSection = "make metric columns";
@ -1206,7 +1246,17 @@ public class IndexMaker
int metIndex = 0;
for (String metric : mergedMetrics) {
makeMetricColumn(v9Smoosher, progress, theRows, metIndex, metric, valueTypes, metricTypeNames, rowCount);
makeMetricColumn(
v9Smoosher,
progress,
theRows,
metIndex,
metric,
valueTypes,
metricTypeNames,
rowCount,
indexSpec.getMetricCompressionStrategy()
);
metIndex++;
}
progress.stopSection(metSection);
@ -1220,7 +1270,8 @@ public class IndexMaker
final String metric,
final Map<String, ValueType> valueTypes,
final Map<String, String> metricTypeNames,
final int rowCount
final int rowCount,
final CompressedObjectStrategy.CompressionStrategy compressionStrategy
) throws IOException
{
final String section = String.format("make column[%s]", metric);
@ -1243,7 +1294,7 @@ public class IndexMaker
CompressedFloatsIndexedSupplier compressedFloats = CompressedFloatsIndexedSupplier.fromFloatBuffer(
FloatBuffer.wrap(arr),
IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
compressionStrategy
);
writeColumn(
@ -1267,7 +1318,7 @@ public class IndexMaker
CompressedLongsIndexedSupplier compressedLongs = CompressedLongsIndexedSupplier.fromLongBuffer(
LongBuffer.wrap(arr),
IndexIO.BYTE_ORDER,
CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY
compressionStrategy
);
writeColumn(
@ -1324,7 +1375,8 @@ public class IndexMaker
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Set<String> skippedDimensions,
final ProgressIndicator progress
final ProgressIndicator progress,
final IndexSpec indexSpec
) throws IOException
{
final String section = "building index.drd";
@ -1350,7 +1402,7 @@ public class IndexMaker
GenericIndexed<String> cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.stringStrategy);
GenericIndexed<String> dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.stringStrategy);
final String bitmapSerdeFactoryType = mapper.writeValueAsString(bitmapSerdeFactory);
final String bitmapSerdeFactoryType = mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory());
final long numBytes = cols.getSerializedSize()
+ dims.getSerializedSize()
+ 16

View File

@ -109,7 +109,6 @@ public class IndexMerger
private static final Splitter SPLITTER = Splitter.on(",");
private static final ObjectMapper mapper;
private static final BitmapSerdeFactory bitmapSerdeFactory;
static {
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
@ -125,13 +124,12 @@ public class IndexMerger
)
);
mapper = injector.getInstance(ObjectMapper.class);
bitmapSerdeFactory = injector.getInstance(BitmapSerdeFactory.class);
}
public static File persist(final IncrementalIndex index, File outDir) throws IOException
public static File persist(final IncrementalIndex index, File outDir, IndexSpec indexSpec) throws IOException
{
return persist(index, index.getInterval(), outDir);
return persist(index, index.getInterval(), outDir, indexSpec);
}
/**
@ -146,13 +144,13 @@ public class IndexMerger
*
* @throws java.io.IOException if an IO error occurs persisting the index
*/
public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir) throws IOException
public static File persist(final IncrementalIndex index, final Interval dataInterval, File outDir, IndexSpec indexSpec) throws IOException
{
return persist(index, dataInterval, outDir, new BaseProgressIndicator());
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
}
public static File persist(
final IncrementalIndex index, final Interval dataInterval, File outDir, ProgressIndicator progress
final IncrementalIndex index, final Interval dataInterval, File outDir, IndexSpec indexSpec, ProgressIndicator progress
) throws IOException
{
if (index.isEmpty()) {
@ -183,24 +181,25 @@ public class IndexMerger
new IncrementalIndexAdapter(
dataInterval,
index,
bitmapSerdeFactory.getBitmapFactory()
indexSpec.getBitmapSerdeFactory().getBitmapFactory()
)
),
index.getMetricAggs(),
outDir,
indexSpec,
progress
);
}
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec
) throws IOException
{
return mergeQueryableIndex(indexes, metricAggs, outDir, new BaseProgressIndicator());
return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
public static File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress
) throws IOException
{
return merge(
@ -217,19 +216,20 @@ public class IndexMerger
),
metricAggs,
outDir,
indexSpec,
progress
);
}
public static File merge(
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec
) throws IOException
{
return merge(indexes, metricAggs, outDir, new BaseProgressIndicator());
return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
public static File merge(
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir, ProgressIndicator progress
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
@ -316,18 +316,18 @@ public class IndexMerger
}
};
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn);
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
}
public static File append(
List<IndexableAdapter> indexes, File outDir
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec
) throws IOException
{
return append(indexes, outDir, new BaseProgressIndicator());
return append(indexes, outDir, indexSpec, new BaseProgressIndicator());
}
public static File append(
List<IndexableAdapter> indexes, File outDir, ProgressIndicator progress
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec, ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
@ -396,7 +396,7 @@ public class IndexMerger
}
};
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn);
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
}
private static File makeIndexFiles(
@ -405,7 +405,8 @@ public class IndexMerger
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
{
final Map<String, ValueType> valueTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
@ -465,7 +466,7 @@ public class IndexMerger
dataInterval = new Interval(minTime, maxTime);
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
serializerUtils.writeString(channel, mapper.writeValueAsString(bitmapSerdeFactory));
serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()));
}
finally {
CloseQuietly.close(channel);
@ -764,6 +765,7 @@ public class IndexMerger
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
GenericIndexedWriter<ImmutableBitmap> writer = new GenericIndexedWriter<>(
ioPeon, dimension, bitmapSerdeFactory.getObjectStrategy()
);
@ -875,10 +877,11 @@ public class IndexMerger
v8OutDir,
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.stringStrategy),
GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.stringStrategy),
dataInterval
dataInterval,
indexSpec.getBitmapSerdeFactory()
);
IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir);
IndexIO.DefaultIndexIOHandler.convertV8toV9(v8OutDir, outDir, indexSpec);
FileUtils.deleteDirectory(v8OutDir);
return outDir;
@ -902,7 +905,8 @@ public class IndexMerger
File inDir,
GenericIndexed<String> availableDimensions,
GenericIndexed<String> availableMetrics,
Interval dataInterval
Interval dataInterval,
BitmapSerdeFactory bitmapSerdeFactory
) throws IOException
{
File indexFile = new File(inDir, "index.drd");

View File

@ -0,0 +1,180 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Set;
/**
* IndexSpec defines segment storage format options to be used at indexing time,
* such as bitmap type, and column compression formats.
*
* IndexSpec is specified as part of the TuningConfig for the corresponding index task.
*/
public class IndexSpec
{
public static final String UNCOMPRESSED = "uncompressed";
public static final String DEFAULT_METRIC_COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY.name().toLowerCase();
public static final String DEFAULT_DIMENSION_COMPRESSION = CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY.name().toLowerCase();
private static final Set<String> COMPRESSION_NAMES = Sets.newHashSet(
Iterables.transform(
Arrays.asList(CompressedObjectStrategy.CompressionStrategy.values()),
new Function<CompressedObjectStrategy.CompressionStrategy, String>()
{
@Nullable
@Override
public String apply(CompressedObjectStrategy.CompressionStrategy input)
{
return input.name().toLowerCase();
}
}
)
);
private final BitmapSerdeFactory bitmapSerdeFactory;
private final String dimensionCompression;
private final String metricCompression;
/**
* Creates an IndexSpec with default parameters
*/
public IndexSpec()
{
this(null, null, null);
}
/**
* Creates an IndexSpec with the given storage format settings.
*
*
* @param bitmapSerdeFactory type of bitmap to use (e.g. roaring or concise), null to use the default.
* Defaults to the bitmap type specified by the (deprecated) "druid.processing.bitmap.type"
* setting, or, if none was set, uses the default @{link BitmapSerde.DefaultBitmapSerdeFactory}
*
* @param dimensionCompression compression format for dimension columns. The default, null, means no compression
*
* @param metricCompression compression format for metric columns, null to use the default.
* Defaults to @{link CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY}
*/
@JsonCreator
public IndexSpec(
@JsonProperty("bitmap") BitmapSerdeFactory bitmapSerdeFactory,
@JsonProperty("dimensionCompression") String dimensionCompression,
@JsonProperty("metricCompression") String metricCompression
)
{
Preconditions.checkArgument(dimensionCompression == null || dimensionCompression.equals(UNCOMPRESSED) || COMPRESSION_NAMES.contains(dimensionCompression),
"Unknown compression type[%s]", dimensionCompression);
Preconditions.checkArgument(metricCompression == null || COMPRESSION_NAMES.contains(metricCompression),
"Unknown compression type[%s]", metricCompression);
this.bitmapSerdeFactory = bitmapSerdeFactory != null ? bitmapSerdeFactory : IndexIO.CONFIGURED_BITMAP_SERDE_FACTORY;
this.metricCompression = metricCompression;
this.dimensionCompression = dimensionCompression;
}
@JsonProperty("bitmap")
public BitmapSerdeFactory getBitmapSerdeFactory()
{
return bitmapSerdeFactory;
}
@JsonProperty("dimensionCompression")
public String getDimensionCompression()
{
return dimensionCompression;
}
@JsonProperty("metricCompression")
public String getMetricCompression()
{
return metricCompression;
}
public CompressedObjectStrategy.CompressionStrategy getMetricCompressionStrategy()
{
return CompressedObjectStrategy.CompressionStrategy.valueOf(
(metricCompression == null ? DEFAULT_METRIC_COMPRESSION : metricCompression).toUpperCase()
);
}
public CompressedObjectStrategy.CompressionStrategy getDimensionCompressionStrategy()
{
return dimensionCompression == null ?
dimensionCompressionStrategyForName(DEFAULT_DIMENSION_COMPRESSION) :
dimensionCompressionStrategyForName(dimensionCompression);
}
private static CompressedObjectStrategy.CompressionStrategy dimensionCompressionStrategyForName(String compression)
{
return compression.equals(UNCOMPRESSED) ? null :
CompressedObjectStrategy.CompressionStrategy.valueOf(compression.toUpperCase());
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IndexSpec indexSpec = (IndexSpec) o;
if (bitmapSerdeFactory != null
? !bitmapSerdeFactory.equals(indexSpec.bitmapSerdeFactory)
: indexSpec.bitmapSerdeFactory != null) {
return false;
}
if (dimensionCompression != null
? !dimensionCompression.equals(indexSpec.dimensionCompression)
: indexSpec.dimensionCompression != null) {
return false;
}
return !(metricCompression != null
? !metricCompression.equals(indexSpec.metricCompression)
: indexSpec.metricCompression != null);
}
@Override
public int hashCode()
{
int result = bitmapSerdeFactory != null ? bitmapSerdeFactory.hashCode() : 0;
result = 31 * result + (dimensionCompression != null ? dimensionCompression.hashCode() : 0);
result = 31 * result + (metricCompression != null ? metricCompression.hashCode() : 0);
return result;
}
}

View File

@ -215,6 +215,11 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
CloseQuietly.close((Closeable) metric);
}
}
for (Object dimension : dimensions.values()) {
if(dimension instanceof Closeable) {
CloseQuietly.close((Closeable) dimension);
}
}
done = true;
}
return hasNext;

View File

@ -17,24 +17,25 @@
package io.druid.segment.column;
import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.data.CachingIndexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import java.io.IOException;
/**
*/
public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
public class SimpleDictionaryEncodedColumn
implements DictionaryEncodedColumn
{
private final VSizeIndexedInts column;
private final VSizeIndexed multiValueColumn;
private final IndexedInts column;
private final IndexedMultivalue<IndexedInts> multiValueColumn;
private final CachingIndexed<String> cachedLookups;
public SimpleDictionaryEncodedColumn(
VSizeIndexedInts singleValueColumn,
VSizeIndexed multiValueColumn,
IndexedInts singleValueColumn,
IndexedMultivalue<IndexedInts> multiValueColumn,
CachingIndexed<String> cachedLookups
)
{
@ -88,6 +89,13 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn
@Override
public void close() throws IOException
{
cachedLookups.close();
CloseQuietly.close(cachedLookups);
if(column != null) {
column.close();
}
if(multiValueColumn != null) {
multiValueColumn.close();
}
}
}

View File

@ -19,66 +19,230 @@ package io.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
import com.metamx.common.Pair;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.BitmapSerde;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.WritableSupplier;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
*/
public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
{
private final boolean isSingleValued;
private static final int NO_FLAGS = 0;
enum Feature {
MULTI_VALUE;
public boolean isSet(int flags) { return (getMask() & flags) != 0; }
public int getMask() { return (1 << ordinal()); }
}
enum VERSION
{
UNCOMPRESSED_SINGLE_VALUE, // 0x0
UNCOMPRESSED_MULTI_VALUE, // 0x1
COMPRESSED; // 0x2
public static VERSION fromByte(byte b) {
final VERSION[] values = VERSION.values();
Preconditions.checkArgument(b < values.length, "Unsupported dictionary column version[%s]", b);
return values[b];
}
public byte asByte() {
return (byte)this.ordinal();
}
}
public static class Builder {
private VERSION version = null;
private int flags = NO_FLAGS;
private GenericIndexed<String> dictionary = null;
private WritableSupplier<IndexedInts> singleValuedColumn = null;
private WritableSupplier<IndexedMultivalue<IndexedInts>> multiValuedColumn = null;
private BitmapSerdeFactory bitmapSerdeFactory = null;
private GenericIndexed<ImmutableBitmap> bitmaps = null;
private ImmutableRTree spatialIndex = null;
private ByteOrder byteOrder = null;
private Builder()
{
}
public Builder withDictionary(GenericIndexed<String> dictionary)
{
this.dictionary = dictionary;
return this;
}
public Builder withBitmapSerdeFactory(BitmapSerdeFactory bitmapSerdeFactory)
{
this.bitmapSerdeFactory = bitmapSerdeFactory;
return this;
}
public Builder withBitmaps(GenericIndexed<ImmutableBitmap> bitmaps)
{
this.bitmaps = bitmaps;
return this;
}
public Builder withSpatialIndex(ImmutableRTree spatialIndex)
{
this.spatialIndex = spatialIndex;
return this;
}
public Builder withByteOrder(ByteOrder byteOrder)
{
this.byteOrder = byteOrder;
return this;
}
public Builder withSingleValuedColumn(VSizeIndexedInts singleValuedColumn)
{
Preconditions.checkState(multiValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
this.version = VERSION.UNCOMPRESSED_SINGLE_VALUE;
this.singleValuedColumn = singleValuedColumn.asWritableSupplier();
return this;
}
public Builder withSingleValuedColumn(CompressedVSizeIntsIndexedSupplier singleValuedColumn)
{
Preconditions.checkState(multiValuedColumn == null, "Cannot set both singleValuedColumn and multiValuedColumn");
this.version = VERSION.COMPRESSED;
this.singleValuedColumn = singleValuedColumn;
return this;
}
public Builder withMultiValuedColumn(VSizeIndexed multiValuedColumn)
{
Preconditions.checkState(singleValuedColumn == null, "Cannot set both multiValuedColumn and singleValuedColumn");
this.version = VERSION.UNCOMPRESSED_MULTI_VALUE;
this.flags |= Feature.MULTI_VALUE.getMask();
this.multiValuedColumn = multiValuedColumn.asWritableSupplier();
return this;
}
public DictionaryEncodedColumnPartSerde build()
{
Preconditions.checkArgument(
singleValuedColumn != null ^ multiValuedColumn != null,
"Exactly one of singleValCol[%s] or multiValCol[%s] must be set",
singleValuedColumn, multiValuedColumn
);
return new DictionaryEncodedColumnPartSerde(
version,
flags,
dictionary,
singleValuedColumn,
multiValuedColumn,
bitmapSerdeFactory,
bitmaps,
spatialIndex,
byteOrder
);
}
}
public static Builder builder()
{
return new Builder();
}
private final BitmapSerdeFactory bitmapSerdeFactory;
private final ByteOrder byteOrder;
private final GenericIndexed<String> dictionary;
private final VSizeIndexedInts singleValuedColumn;
private final VSizeIndexed multiValuedColumn;
private final WritableSupplier<IndexedInts> singleValuedColumn;
private final WritableSupplier<IndexedMultivalue<IndexedInts>> multiValuedColumn;
private final GenericIndexed<ImmutableBitmap> bitmaps;
private final ImmutableRTree spatialIndex;
private final int flags;
private final VERSION version;
private final long size;
@JsonCreator
public DictionaryEncodedColumnPartSerde(
GenericIndexed<String> dictionary,
VSizeIndexedInts singleValCol,
VSizeIndexed multiValCol,
BitmapSerdeFactory bitmapSerdeFactory,
GenericIndexed<ImmutableBitmap> bitmaps,
ImmutableRTree spatialIndex
@Nullable @JsonProperty("bitmapSerdeFactory") BitmapSerdeFactory bitmapSerdeFactory,
@NotNull @JsonProperty("byteOrder") ByteOrder byteOrder
)
{
this.isSingleValued = multiValCol == null;
Preconditions.checkArgument(byteOrder != null, "byte order must be specified");
this.bitmapSerdeFactory = bitmapSerdeFactory == null
? new BitmapSerde.LegacyBitmapSerdeFactory()
: bitmapSerdeFactory;
this.byteOrder = byteOrder;
// dummy values
this.dictionary = null;
this.singleValuedColumn = null;
this.multiValuedColumn = null;
this.bitmaps = null;
this.spatialIndex = null;
this.size = -1;
this.flags = 0;
this.version = VERSION.COMPRESSED;
}
private DictionaryEncodedColumnPartSerde(
VERSION version,
int flags,
GenericIndexed<String> dictionary,
WritableSupplier<IndexedInts> singleValuedColumn,
WritableSupplier<IndexedMultivalue<IndexedInts>> multiValuedColumn,
BitmapSerdeFactory bitmapSerdeFactory,
GenericIndexed<ImmutableBitmap> bitmaps,
ImmutableRTree spatialIndex,
ByteOrder byteOrder
)
{
Preconditions.checkArgument(version.compareTo(VERSION.COMPRESSED) <= 0, "Unsupported version[%s]", version);
this.bitmapSerdeFactory = bitmapSerdeFactory;
this.byteOrder = byteOrder;
this.version = version;
this.flags = flags;
this.dictionary = dictionary;
this.singleValuedColumn = singleValCol;
this.multiValuedColumn = multiValCol;
this.singleValuedColumn = singleValuedColumn;
this.multiValuedColumn = multiValuedColumn;
this.bitmaps = bitmaps;
this.spatialIndex = spatialIndex;
long size = dictionary.getSerializedSize();
if (singleValCol != null && multiValCol == null) {
size += singleValCol.getSerializedSize();
} else if (singleValCol == null && multiValCol != null) {
size += multiValCol.getSerializedSize();
if (Feature.MULTI_VALUE.isSet(flags)) {
size += multiValuedColumn.getSerializedSize();
} else {
throw new IAE("Either singleValCol[%s] or multiValCol[%s] must be set", singleValCol, multiValCol);
size += singleValuedColumn.getSerializedSize();
}
size += bitmaps.getSerializedSize();
if (spatialIndex != null) {
size += spatialIndex.size() + Ints.BYTES;
@ -87,60 +251,38 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
this.size = size;
}
@JsonCreator
public DictionaryEncodedColumnPartSerde(
@JsonProperty("isSingleValued") boolean isSingleValued,
@JsonProperty("bitmapSerdeFactory") BitmapSerdeFactory bitmapSerdeFactory
)
{
this.isSingleValued = isSingleValued;
this.bitmapSerdeFactory = bitmapSerdeFactory == null
? new BitmapSerde.LegacyBitmapSerdeFactory()
: bitmapSerdeFactory;
this.dictionary = null;
this.singleValuedColumn = null;
this.multiValuedColumn = null;
this.bitmaps = null;
this.spatialIndex = null;
this.size = 0;
}
@JsonProperty
private boolean isSingleValued()
{
return isSingleValued;
}
@JsonProperty
public BitmapSerdeFactory getBitmapSerdeFactory()
{
return bitmapSerdeFactory;
}
@Override
public long numBytes()
@JsonProperty
public ByteOrder getByteOrder()
{
return 1 + size;
return byteOrder;
}
@Override
public void write(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{(byte) (isSingleValued ? 0x0 : 0x1)}));
channel.write(ByteBuffer.wrap(new byte[]{version.asByte()}));
if(version.compareTo(VERSION.COMPRESSED) >= 0) {
channel.write(ByteBuffer.wrap(Ints.toByteArray(flags)));
}
if (dictionary != null) {
dictionary.writeToChannel(channel);
}
if (isSingleValued()) {
if (singleValuedColumn != null) {
singleValuedColumn.writeToChannel(channel);
}
} else {
if (Feature.MULTI_VALUE.isSet(flags)) {
if (multiValuedColumn != null) {
multiValuedColumn.writeToChannel(channel);
}
} else {
if (singleValuedColumn != null) {
singleValuedColumn.writeToChannel(channel);
}
}
if (bitmaps != null) {
@ -157,67 +299,114 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
}
@Override
public ColumnPartSerde read(ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig)
public ColumnPartSerde read(
ByteBuffer buffer, ColumnBuilder builder, ColumnConfig columnConfig
)
{
final boolean isSingleValued = buffer.get() == 0x0;
final GenericIndexed<String> dictionary = GenericIndexed.read(buffer, GenericIndexed.stringStrategy);
final VSizeIndexedInts singleValuedColumn;
final VSizeIndexed multiValuedColumn;
final VERSION rVersion = VERSION.fromByte(buffer.get());
final int rFlags;
builder.setType(ValueType.STRING);
if (isSingleValued) {
singleValuedColumn = VSizeIndexedInts.readFromByteBuffer(buffer);
multiValuedColumn = null;
builder.setHasMultipleValues(false)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
dictionary,
singleValuedColumn,
null,
columnConfig.columnCacheSizeBytes()
)
);
if(rVersion.compareTo(VERSION.COMPRESSED) >= 0 ) {
rFlags = buffer.getInt();
} else {
singleValuedColumn = null;
multiValuedColumn = VSizeIndexed.readFromByteBuffer(buffer);
builder.setHasMultipleValues(true)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
dictionary,
null,
multiValuedColumn,
columnConfig.columnCacheSizeBytes()
)
);
rFlags = rVersion.equals(VERSION.UNCOMPRESSED_MULTI_VALUE) ?
Feature.MULTI_VALUE.getMask() :
NO_FLAGS;
}
GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(
final boolean hasMultipleValues = Feature.MULTI_VALUE.isSet(rFlags);
if(rVersion.equals(VERSION.COMPRESSED) && hasMultipleValues) {
throw new IAE("Compressed dictionary encoded columns currently do not support multi-value columns");
}
final GenericIndexed<String> rDictionary = GenericIndexed.read(buffer, GenericIndexed.stringStrategy);
builder.setType(ValueType.STRING);
final WritableSupplier<IndexedInts> rSingleValuedColumn;
final WritableSupplier<IndexedMultivalue<IndexedInts>> rMultiValuedColumn;
if (rVersion.compareTo(VERSION.COMPRESSED) >= 0) {
rSingleValuedColumn = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(buffer, byteOrder);
rMultiValuedColumn = null;
} else {
Pair<WritableSupplier<IndexedInts>, VSizeIndexed> cols = readUncompressed(rVersion, buffer);
rSingleValuedColumn = cols.lhs;
rMultiValuedColumn = cols.rhs == null ? null : cols.rhs.asWritableSupplier();
}
builder.setHasMultipleValues(hasMultipleValues)
.setDictionaryEncodedColumn(
new DictionaryEncodedColumnSupplier(
rDictionary,
rSingleValuedColumn,
rMultiValuedColumn,
columnConfig.columnCacheSizeBytes()
)
);
GenericIndexed<ImmutableBitmap> rBitmaps = GenericIndexed.read(
buffer, bitmapSerdeFactory.getObjectStrategy()
);
builder.setBitmapIndex(
new BitmapIndexColumnPartSupplier(
bitmapSerdeFactory.getBitmapFactory(),
bitmaps,
dictionary
rBitmaps,
rDictionary
)
);
ImmutableRTree spatialIndex = null;
ImmutableRTree rSpatialIndex = null;
if (buffer.hasRemaining()) {
spatialIndex = ByteBufferSerializer.read(
rSpatialIndex = ByteBufferSerializer.read(
buffer, new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapSerdeFactory.getBitmapFactory())
);
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex));
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(rSpatialIndex));
}
return new DictionaryEncodedColumnPartSerde(
dictionary,
singleValuedColumn,
multiValuedColumn,
rVersion,
rFlags,
rDictionary,
rSingleValuedColumn,
rMultiValuedColumn,
bitmapSerdeFactory,
bitmaps,
spatialIndex
rBitmaps,
rSpatialIndex,
byteOrder
);
}
private static Pair<WritableSupplier<IndexedInts>, VSizeIndexed> readUncompressed(
VERSION version,
ByteBuffer buffer
)
{
final WritableSupplier<IndexedInts> singleValuedColumn;
final VSizeIndexed multiValuedColumn;
switch (version) {
case UNCOMPRESSED_SINGLE_VALUE:
singleValuedColumn = VSizeIndexedInts.readFromByteBuffer(buffer).asWritableSupplier();
multiValuedColumn = null;
break;
case UNCOMPRESSED_MULTI_VALUE:
singleValuedColumn = null;
multiValuedColumn = VSizeIndexed.readFromByteBuffer(buffer);
break;
default:
throw new IAE("Unsupported version[%s]", version);
}
return Pair.of(singleValuedColumn, multiValuedColumn);
}
@Override
public long numBytes()
{
return 1 + // version
(version.compareTo(VERSION.COMPRESSED) >= 0 ? Ints.BYTES : 0) + // flag if version >= compressed
size; // size of everything else (dictionary, bitmaps, column, spatialIndex)
}
}

View File

@ -22,22 +22,22 @@ import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
import io.druid.segment.data.CachingIndexed;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
/**
*/
public class DictionaryEncodedColumnSupplier implements Supplier<DictionaryEncodedColumn>
{
private final GenericIndexed<String> dictionary;
private final VSizeIndexedInts singleValuedColumn;
private final VSizeIndexed multiValuedColumn;
private final Supplier<IndexedInts> singleValuedColumn;
private final Supplier<IndexedMultivalue<IndexedInts>> multiValuedColumn;
private final int lookupCacheSize;
public DictionaryEncodedColumnSupplier(
GenericIndexed<String> dictionary,
VSizeIndexedInts singleValuedColumn,
VSizeIndexed multiValuedColumn,
Supplier<IndexedInts> singleValuedColumn,
Supplier<IndexedMultivalue<IndexedInts>> multiValuedColumn,
int lookupCacheSize
)
{
@ -51,8 +51,8 @@ public class DictionaryEncodedColumnSupplier implements Supplier<DictionaryEncod
public DictionaryEncodedColumn get()
{
return new SimpleDictionaryEncodedColumn(
singleValuedColumn,
multiValuedColumn,
singleValuedColumn != null ? singleValuedColumn.get() : null,
multiValuedColumn != null ? multiValuedColumn.get() : null,
new CachingIndexed<>(dictionary, lookupCacheSize)
);
}

View File

@ -60,7 +60,8 @@ public class EmptyIndexTest
IndexMerger.merge(
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
new AggregatorFactory[0],
tmpDir
tmpDir,
new IndexSpec()
);
QueryableIndex emptyQueryableIndex = IndexIO.loadIndex(tmpDir);

View File

@ -26,22 +26,61 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.column.Column;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
/**
*/
@RunWith(Parameterized.class)
public class IndexMergerTest
{
static {
@Parameterized.Parameters(name = "{index}: bitmap={0}, compression={1}")
public static Collection<Object[]> data()
{
return Arrays.asList(
new Object[][]{
{ null, null },
{ new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 },
{ new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZ4 },
{ new RoaringBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF},
{ new ConciseBitmapSerdeFactory(), CompressedObjectStrategy.CompressionStrategy.LZF},
}
);
}
static IndexSpec makeIndexSpec(
BitmapSerdeFactory bitmapSerdeFactory,
CompressedObjectStrategy.CompressionStrategy compressionStrategy
)
{
if(bitmapSerdeFactory != null || compressionStrategy != null) {
return new IndexSpec(
bitmapSerdeFactory,
compressionStrategy.name().toLowerCase(),
null
);
} else {
return new IndexSpec();
}
}
private final IndexSpec indexSpec;
public IndexMergerTest(BitmapSerdeFactory bitmapSerdeFactory, CompressedObjectStrategy.CompressionStrategy compressionStrategy)
{
this.indexSpec = makeIndexSpec(bitmapSerdeFactory, compressionStrategy);
}
@Test
@ -54,7 +93,7 @@ public class IndexMergerTest
final File tempDir = Files.createTempDir();
try {
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir));
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec));
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -94,13 +133,13 @@ public class IndexMergerTest
final File tempDir2 = Files.createTempDir();
final File mergedDir = Files.createTempDir();
try {
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1));
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec));
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec));
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
@ -110,7 +149,8 @@ public class IndexMergerTest
IndexMerger.mergeQueryableIndex(
Arrays.asList(index1, index2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedDir
mergedDir,
indexSpec
)
);
@ -151,10 +191,10 @@ public class IndexMergerTest
)
);
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2));
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1, indexSpec));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2, indexSpec));
final QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3, indexSpec)
);
Assert.assertEquals(1, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());

View File

@ -0,0 +1,74 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import org.junit.Assert;
import org.junit.Test;
public class IndexSpecTest
{
@Test
public void testConfiguredBitmap() throws Exception
{
// this is just to make sure testSerde correctly tests the bitmap type override
Assert.assertEquals(new ConciseBitmapSerdeFactory(), IndexIO.CONFIGURED_BITMAP_SERDE_FACTORY);
}
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = new DefaultObjectMapper();
final String json = "{ \"bitmap\" : { \"type\" : \"roaring\" }, \"dimensionCompression\" : \"lz4\", \"metricCompression\" : \"lzf\" }";
final IndexSpec spec = objectMapper.readValue(json, IndexSpec.class);
Assert.assertEquals(new RoaringBitmapSerdeFactory(), spec.getBitmapSerdeFactory());
Assert.assertEquals(CompressedObjectStrategy.CompressionStrategy.LZ4, spec.getDimensionCompressionStrategy());
Assert.assertEquals(CompressedObjectStrategy.CompressionStrategy.LZF, spec.getMetricCompressionStrategy());
Assert.assertEquals(spec, objectMapper.readValue(objectMapper.writeValueAsBytes(spec), IndexSpec.class));
}
@Test
public void testSerdeUncompressed() throws Exception
{
final ObjectMapper objectMapper = new DefaultObjectMapper();
final String json = "{ \"dimensionCompression\" : \"uncompressed\" }";
final IndexSpec spec = objectMapper.readValue(json, IndexSpec.class);
Assert.assertEquals(IndexSpec.UNCOMPRESSED, spec.getDimensionCompression());
Assert.assertEquals(null, spec.getDimensionCompressionStrategy());
Assert.assertEquals(spec, objectMapper.readValue(objectMapper.writeValueAsBytes(spec), IndexSpec.class));
}
@Test
public void testDefaults() throws Exception
{
final IndexSpec spec = new IndexSpec();
Assert.assertEquals(IndexIO.CONFIGURED_BITMAP_SERDE_FACTORY, spec.getBitmapSerdeFactory());
Assert.assertEquals(CompressedObjectStrategy.CompressionStrategy.LZ4, spec.getDimensionCompressionStrategy());
Assert.assertEquals(CompressedObjectStrategy.CompressionStrategy.LZ4, spec.getMetricCompressionStrategy());
}
}

View File

@ -77,6 +77,8 @@ public class SchemalessIndex
new CountAggregatorFactory("count")
};
private static final IndexSpec indexSpec = new IndexSpec();
private static final List<Map<String, Object>> events = Lists.newArrayList();
private static final Map<Integer, Map<Integer, QueryableIndex>> incrementalIndexes = Maps.newHashMap();
@ -184,12 +186,12 @@ public class SchemalessIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(top, topFile);
IndexMerger.persist(bottom, bottomFile);
IndexMerger.persist(top, topFile, indexSpec);
IndexMerger.persist(bottom, bottomFile, indexSpec);
mergedIndex = io.druid.segment.IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile, indexSpec
)
);
@ -231,7 +233,7 @@ public class SchemalessIndex
QueryableIndex index = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile, indexSpec
)
);
@ -267,7 +269,7 @@ public class SchemalessIndex
}
QueryableIndex index = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile)
IndexMerger.mergeQueryableIndex(indexesToMerge, METRIC_AGGS, mergedFile, indexSpec)
);
return index;
@ -348,7 +350,7 @@ public class SchemalessIndex
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(rowIndex, tmpFile);
IndexMerger.persist(rowIndex, tmpFile, indexSpec);
rowPersistedIndexes.add(IndexIO.loadIndex(tmpFile));
}
}
@ -408,7 +410,7 @@ public class SchemalessIndex
theFile.mkdirs();
theFile.deleteOnExit();
filesToMap.add(theFile);
IndexMerger.persist(index, theFile);
IndexMerger.persist(index, theFile, indexSpec);
}
return filesToMap;
@ -482,7 +484,7 @@ public class SchemalessIndex
)
);
return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile));
return IndexIO.loadIndex(IndexMerger.append(adapters, mergedFile, indexSpec));
}
catch (IOException e) {
throw Throwables.propagate(e);
@ -521,7 +523,8 @@ public class SchemalessIndex
)
),
METRIC_AGGS,
mergedFile
mergedFile,
indexSpec
)
);
}

View File

@ -70,6 +70,7 @@ public class TestIndex
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
};
private static final IndexSpec indexSpec = new IndexSpec();
static {
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
@ -131,14 +132,15 @@ public class TestIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(top, DATA_INTERVAL, topFile);
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile);
IndexMerger.persist(top, DATA_INTERVAL, topFile, indexSpec);
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile, indexSpec);
mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)),
METRIC_AGGS,
mergedFile
mergedFile,
indexSpec
)
);
@ -243,7 +245,7 @@ public class TestIndex
someTmpFile.mkdirs();
someTmpFile.deleteOnExit();
IndexMerger.persist(index, someTmpFile);
IndexMerger.persist(index, someTmpFile, indexSpec);
return IndexIO.loadIndex(someTmpFile);
}
catch (IOException e) {

View File

@ -43,6 +43,7 @@ import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -86,9 +87,10 @@ public class SpatialFilterBonusTest
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final IndexSpec indexSpec = new IndexSpec();
final IncrementalIndex rtIndex = makeIncrementalIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex();
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex(indexSpec);
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex(indexSpec);
return Arrays.asList(
new Object[][]{
{
@ -222,7 +224,7 @@ public class SpatialFilterBonusTest
return theIndex;
}
private static QueryableIndex makeQueryableIndex() throws IOException
private static QueryableIndex makeQueryableIndex(IndexSpec indexSpec) throws IOException
{
IncrementalIndex theIndex = makeIncrementalIndex();
File tmpFile = File.createTempFile("billy", "yay");
@ -230,11 +232,11 @@ public class SpatialFilterBonusTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
IndexMerger.persist(theIndex, tmpFile, indexSpec);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
private static QueryableIndex makeMergedQueryableIndex(final IndexSpec indexSpec)
{
try {
IncrementalIndex first = new OnheapIncrementalIndex(
@ -410,15 +412,16 @@ public class SpatialFilterBonusTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
IndexMerger.persist(first, DATA_INTERVAL, firstFile, indexSpec);
IndexMerger.persist(second, DATA_INTERVAL, secondFile, indexSpec);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
mergedFile,
indexSpec
)
);

View File

@ -44,6 +44,7 @@ import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -82,9 +83,10 @@ public class SpatialFilterTest
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final IndexSpec indexSpec = new IndexSpec();
final IncrementalIndex rtIndex = makeIncrementalIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex();
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex(indexSpec);
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex(indexSpec);
return Arrays.asList(
new Object[][]{
{
@ -251,7 +253,7 @@ public class SpatialFilterTest
return theIndex;
}
private static QueryableIndex makeQueryableIndex() throws IOException
private static QueryableIndex makeQueryableIndex(IndexSpec indexSpec) throws IOException
{
IncrementalIndex theIndex = makeIncrementalIndex();
File tmpFile = File.createTempFile("billy", "yay");
@ -259,11 +261,11 @@ public class SpatialFilterTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
IndexMerger.persist(theIndex, tmpFile, indexSpec);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec)
{
try {
IncrementalIndex first = new OnheapIncrementalIndex(
@ -479,15 +481,16 @@ public class SpatialFilterTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
IndexMerger.persist(first, DATA_INTERVAL, firstFile, indexSpec);
IndexMerger.persist(second, DATA_INTERVAL, secondFile, indexSpec);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
mergedFile,
indexSpec
)
);

View File

@ -20,6 +20,9 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.io.Files;
import io.druid.segment.IndexSpec;
import io.druid.segment.data.BitmapSerde;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
@ -42,6 +45,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final RejectionPolicyFactory defaultRejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
private static final int defaultMaxPendingPersists = 0;
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
private static final int defaultBufferSize = 128 * 1024* 1024; // 128M
@ -59,6 +63,7 @@ public class RealtimeTuningConfig implements TuningConfig
defaultRejectionPolicyFactory,
defaultMaxPendingPersists,
defaultShardSpec,
defaultIndexSpec,
defaultPersistInHeap,
defaultIngestOffheap,
defaultBufferSize
@ -73,6 +78,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final RejectionPolicyFactory rejectionPolicyFactory;
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final IndexSpec indexSpec;
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
@ -87,6 +93,7 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap,
@JsonProperty("buffersize") Integer bufferSize
@ -104,6 +111,7 @@ public class RealtimeTuningConfig implements TuningConfig
: rejectionPolicyFactory;
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
@ -158,6 +166,12 @@ public class RealtimeTuningConfig implements TuningConfig
return shardSpec;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
public boolean isPersistInHeap()
{
@ -185,6 +199,7 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
indexSpec,
persistInHeap,
ingestOffheap,
bufferSize
@ -202,6 +217,7 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
indexSpec,
persistInHeap,
ingestOffheap,
bufferSize

View File

@ -55,6 +55,7 @@ import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -438,13 +439,15 @@ public class RealtimePlumber implements Plumber
mergedFile = IndexMaker.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
mergedTarget,
config.getIndexSpec()
);
} else {
mergedFile = IndexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
mergedTarget,
config.getIndexSpec()
);
}
@ -833,15 +836,18 @@ public class RealtimePlumber implements Plumber
int numRows = indexToPersist.getIndex().size();
final File persistedFile;
final IndexSpec indexSpec = config.getIndexSpec();
if (config.isPersistInHeap()) {
persistedFile = IndexMaker.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
indexSpec
);
} else {
persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
indexSpec
);
}

View File

@ -74,7 +74,7 @@ public class FireDepartmentTest
)
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, false, false, null
null, null, null, null, null, null, null, null, null, false, false, null
)
);

View File

@ -105,6 +105,7 @@ public class RealtimeManagerTest
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -164,6 +164,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
null,
null
);

View File

@ -62,6 +62,7 @@ public class SinkTest
null,
null,
null,
null,
false,
false,
null