Merge pull request #1728 from himanshug/aggregators_in_segment_metadata

Store AggregatorFactory[] in segment metadata
This commit is contained in:
Fangjin Yang 2016-01-19 12:55:49 -08:00
commit 0c31f007fc
53 changed files with 981 additions and 198 deletions

View File

@ -284,7 +284,8 @@ Append tasks append a list of segments together into a single segment (one after
"type": "append",
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>
"segments": <JSON list of DataSegment objects to append>,
"aggregations": <optional list of aggregators>
}
```

View File

@ -41,7 +41,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public abstract class SketchAggregatorFactory implements AggregatorFactory
public abstract class SketchAggregatorFactory extends AggregatorFactory
{
public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import java.util.Collections;
import java.util.List;
@ -66,7 +67,25 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
@Override
public AggregatorFactory getCombiningFactory()
{
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, isInputThetaSketch);
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, false);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other instanceof SketchMergeAggregatorFactory) {
SketchMergeAggregatorFactory castedOther = (SketchMergeAggregatorFactory) other;
return new SketchMergeAggregatorFactory(
name,
name,
Math.max(size, castedOther.size),
shouldFinalize,
true
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@JsonProperty

View File

@ -29,6 +29,7 @@ import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import org.apache.commons.codec.binary.Base64;
@ -39,7 +40,7 @@ import java.util.Comparator;
import java.util.List;
@JsonTypeName("approxHistogram")
public class ApproximateHistogramAggregatorFactory implements AggregatorFactory
public class ApproximateHistogramAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x8;
@ -116,6 +117,26 @@ public class ApproximateHistogramAggregatorFactory implements AggregatorFactory
return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other instanceof ApproximateHistogramAggregatorFactory) {
ApproximateHistogramAggregatorFactory castedOther = (ApproximateHistogramAggregatorFactory) other;
return new ApproximateHistogramFoldingAggregatorFactory(
name,
name,
Math.max(resolution, castedOther.resolution),
numBuckets,
Math.min(lowerLimit, castedOther.lowerLimit),
Math.max(upperLimit, castedOther.upperLimit)
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -469,11 +469,11 @@ public class IndexGeneratorJob implements Jobby
{
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
index, interval, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
index, interval, file, config.getIndexSpec(), progressIndicator
);
}
}

View File

@ -222,7 +222,6 @@ public class YeOldePlumberSchool implements PlumberSchool
indexMerger.persist(
indexToPersist.getIndex(),
dirToPersist,
null,
config.getIndexSpec()
);

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.indexing.common.TaskToolbox;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
@ -50,18 +51,21 @@ public class AppendTask extends MergeTaskBase
{
private final IndexSpec indexSpec;
private final List<AggregatorFactory> aggregators;
@JsonCreator
public AppendTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, dataSource, segments, context);
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
this.aggregators = aggregators;
}
@Override
@ -109,7 +113,6 @@ public class AppendTask extends MergeTaskBase
);
List<IndexableAdapter> adapters = Lists.newArrayList();
for (final SegmentToMergeHolder holder : segmentsToMerge) {
adapters.add(
new RowboatFilteringIndexAdapter(
@ -128,7 +131,12 @@ public class AppendTask extends MergeTaskBase
);
}
return toolbox.getIndexMerger().append(adapters, outDir, indexSpec);
return toolbox.getIndexMerger().append(
adapters,
aggregators == null ? null : aggregators.toArray(new AggregatorFactory[aggregators.size()]),
outDir,
indexSpec
);
}
@Override
@ -137,6 +145,12 @@ public class AppendTask extends MergeTaskBase
return "append";
}
@JsonProperty("aggregations")
public List<AggregatorFactory> getAggregators()
{
return aggregators;
}
private static class SegmentToMergeHolder
{
private final DataSegment segment;

View File

@ -391,6 +391,9 @@ public class TaskSerdeTest
null,
"foo",
segments,
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("cnt")
),
indexSpec,
null
);
@ -417,6 +420,7 @@ public class TaskSerdeTest
Assert.assertEquals("foo", task3.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task3.getInterval());
Assert.assertEquals(task3.getSegments(), segments);
Assert.assertEquals(task.getAggregators(), task2.getAggregators());
}
@Test

View File

@ -154,7 +154,7 @@ public class IngestSegmentFirehoseFactoryTest
if (!persistDir.mkdirs() && !persistDir.exists()) {
throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath()));
}
INDEX_MERGER.persist(index, persistDir, null, indexSpec);
INDEX_MERGER.persist(index, persistDir, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)

View File

@ -230,7 +230,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
}
try {
INDEX_MERGER.persist(index, persistDir, null, new IndexSpec());
INDEX_MERGER.persist(index, persistDir, new IndexSpec());
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -19,10 +19,13 @@
package io.druid.query.aggregation;
import com.metamx.common.logger.Logger;
import io.druid.segment.ColumnSelectorFactory;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Processing related interface
@ -34,13 +37,15 @@ import java.util.List;
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how
* the data is actually stored and accessed.
*/
public interface AggregatorFactory
public abstract class AggregatorFactory
{
public Aggregator factorize(ColumnSelectorFactory metricFactory);
private static final Logger log = new Logger(AggregatorFactory.class);
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
public abstract Aggregator factorize(ColumnSelectorFactory metricFactory);
public Comparator getComparator();
public abstract BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory);
public abstract Comparator getComparator();
/**
* A method that knows how to combine the outputs of the getIntermediate() method from the Aggregators
@ -53,7 +58,7 @@ public interface AggregatorFactory
*
* @return an object representing the combination of lhs and rhs, this can be a new object or a mutation of the inputs
*/
public Object combine(Object lhs, Object rhs);
public abstract Object combine(Object lhs, Object rhs);
/**
* Returns an AggregatorFactory that can be used to combine the output of aggregators from this factory. This
@ -62,14 +67,26 @@ public interface AggregatorFactory
*
* @return a new Factory that can be used for operations on top of data output from the current factory.
*/
public AggregatorFactory getCombiningFactory();
public abstract AggregatorFactory getCombiningFactory();
/**
* Returns an AggregatorFactory that can be used to merge the output of aggregators from this factory and
* other factory.
* This method is relevant only for AggregatorFactory which can be used at ingestion time.
*
* @return a new Factory that can be used for merging the output of aggregators from this factory and other.
*/
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
throw new UnsupportedOperationException(String.format("[%s] does not implement getMergingFactory(..)", this.getClass().getName()));
}
/**
* Gets a list of all columns that this AggregatorFactory will scan
*
* @return AggregatorFactories for the columns to scan of the parent AggregatorFactory
*/
public List<AggregatorFactory> getRequiredColumns();
public abstract List<AggregatorFactory> getRequiredColumns();
/**
* A method that knows how to "deserialize" the object from whatever form it might have been put into
@ -79,7 +96,7 @@ public interface AggregatorFactory
*
* @return the deserialized object
*/
public Object deserialize(Object object);
public abstract Object deserialize(Object object);
/**
* "Finalizes" the computation of an object. Primarily useful for complex types that have a different mergeable
@ -89,27 +106,76 @@ public interface AggregatorFactory
*
* @return the finalized value that should be returned for the initial query
*/
public Object finalizeComputation(Object object);
public abstract Object finalizeComputation(Object object);
public String getName();
public abstract String getName();
public List<String> requiredFields();
public abstract List<String> requiredFields();
public byte[] getCacheKey();
public abstract byte[] getCacheKey();
public String getTypeName();
public abstract String getTypeName();
/**
* Returns the maximum size that this aggregator will require in bytes for intermediate storage of results.
*
* @return the maximum number of bytes that an aggregator of this type will require for intermediate result storage.
*/
public int getMaxIntermediateSize();
public abstract int getMaxIntermediateSize();
/**
* Returns the starting value for a corresponding aggregator. For example, 0 for sums, - Infinity for max, an empty mogrifier
*
* @return the starting value for a corresponding aggregator.
*/
public Object getAggregatorStartValue();
public abstract Object getAggregatorStartValue();
/**
* Merges the list of AggregatorFactory[] (presumable from metadata of some segments being merged) and
* returns merged AggregatorFactory[] (for the metadata for merged segment).
* Null is returned if it is not possible to do the merging for any of the following reason.
* - one of the element in input list is null i.e. aggregators for one the segments being merged is unknown
* - AggregatorFactory of same name can not be merged if they are not compatible
*
* @param aggregatorsList
*
* @return merged AggregatorFactory[] or Null if merging is not possible.
*/
public static AggregatorFactory[] mergeAggregators(List<AggregatorFactory[]> aggregatorsList)
{
if (aggregatorsList == null || aggregatorsList.isEmpty()) {
return null;
}
Map<String, AggregatorFactory> mergedAggregators = new LinkedHashMap<>();
for (AggregatorFactory[] aggregators : aggregatorsList) {
if (aggregators != null) {
for (AggregatorFactory aggregator : aggregators) {
String name = aggregator.getName();
if (mergedAggregators.containsKey(name)) {
AggregatorFactory other = mergedAggregators.get(name);
try {
mergedAggregators.put(name, other.getMergingFactory(aggregator));
}
catch (AggregatorFactoryNotMergeableException ex) {
log.warn(ex, "failed to merge aggregator factories");
mergedAggregators = null;
break;
}
} else {
mergedAggregators.put(name, aggregator);
}
}
} else {
mergedAggregators = null;
break;
}
}
return mergedAggregators == null
? null
: mergedAggregators.values().toArray(new AggregatorFactory[mergedAggregators.size()]);
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
/**
*/
public class AggregatorFactoryNotMergeableException extends Exception
{
public AggregatorFactoryNotMergeableException()
{
}
public AggregatorFactoryNotMergeableException(String formatText, Object... arguments)
{
super(String.format(formatText, arguments));
}
public AggregatorFactoryNotMergeableException(Throwable cause, String formatText, Object... arguments)
{
super(String.format(formatText, arguments), cause);
}
public AggregatorFactoryNotMergeableException(Throwable cause)
{
super(cause);
}
public AggregatorFactoryNotMergeableException(AggregatorFactory af1, AggregatorFactory af2)
{
this(
"can't merge [%s : %s] and [%s : %s] , with detailed info [%s] and [%s]",
af1.getName(),
af1.getClass().getName(),
af2.getName(),
af2.getClass().getName(),
af1,
af2
);
}
}

View File

@ -32,7 +32,7 @@ import java.util.List;
/**
*/
public class CountAggregatorFactory implements AggregatorFactory
public class CountAggregatorFactory extends AggregatorFactory
{
private static final byte[] CACHE_KEY = new byte[]{0x0};
private final String name;

View File

@ -33,7 +33,7 @@ import java.util.List;
/**
*/
public class DoubleMaxAggregatorFactory implements AggregatorFactory
public class DoubleMaxAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x3;
@ -83,6 +83,16 @@ public class DoubleMaxAggregatorFactory implements AggregatorFactory
return new DoubleMaxAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -33,7 +33,7 @@ import java.util.List;
/**
*/
public class DoubleMinAggregatorFactory implements AggregatorFactory
public class DoubleMinAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x4;
@ -83,6 +83,16 @@ public class DoubleMinAggregatorFactory implements AggregatorFactory
return new DoubleMinAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -33,7 +33,7 @@ import java.util.List;
/**
*/
public class DoubleSumAggregatorFactory implements AggregatorFactory
public class DoubleSumAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x2;
@ -86,6 +86,16 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
return new DoubleSumAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -30,7 +30,7 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class FilteredAggregatorFactory implements AggregatorFactory
public class FilteredAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x9;

View File

@ -34,7 +34,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
public class HistogramAggregatorFactory implements AggregatorFactory
public class HistogramAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x7;

View File

@ -42,7 +42,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.Comparator;
import java.util.List;
public class JavaScriptAggregatorFactory implements AggregatorFactory
public class JavaScriptAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x6;
@ -137,6 +137,18 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && other.getClass() == this.getClass()) {
JavaScriptAggregatorFactory castedOther = (JavaScriptAggregatorFactory) other;
if (this.fnCombine.equals(castedOther.fnCombine) && this.fnReset.equals(castedOther.fnReset)) {
return getCombiningFactory();
}
}
throw new AggregatorFactoryNotMergeableException(this, other);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -33,7 +33,7 @@ import java.util.List;
/**
*/
public class LongMaxAggregatorFactory implements AggregatorFactory
public class LongMaxAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0xA;
@ -83,6 +83,16 @@ public class LongMaxAggregatorFactory implements AggregatorFactory
return new LongMaxAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -33,7 +33,7 @@ import java.util.List;
/**
*/
public class LongMinAggregatorFactory implements AggregatorFactory
public class LongMinAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0xB;
@ -83,6 +83,16 @@ public class LongMinAggregatorFactory implements AggregatorFactory
return new LongMinAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -33,7 +33,7 @@ import java.util.List;
/**
*/
public class LongSumAggregatorFactory implements AggregatorFactory
public class LongSumAggregatorFactory extends AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x1;
@ -86,6 +86,16 @@ public class LongSumAggregatorFactory implements AggregatorFactory
return new LongSumAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
import com.metamx.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
@ -43,7 +44,7 @@ import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class CardinalityAggregatorFactory implements AggregatorFactory
public class CardinalityAggregatorFactory extends AggregatorFactory
{
public static Object estimateCardinality(Object object)
{
@ -147,6 +148,12 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
throw new UnsupportedOperationException("can't merge CardinalityAggregatorFactory");
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -25,6 +25,7 @@ import com.metamx.common.IAE;
import com.metamx.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
@ -38,7 +39,7 @@ import java.util.List;
/**
*/
public class HyperUniquesAggregatorFactory implements AggregatorFactory
public class HyperUniquesAggregatorFactory extends AggregatorFactory
{
public static Object estimateCardinality(Object object)
{
@ -139,6 +140,16 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory
return new HyperUniquesAggregatorFactory(name, name);
}
@Override
public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException
{
if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) {
return getCombiningFactory();
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
}
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{

View File

@ -19,7 +19,8 @@
package io.druid.segment;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@ -271,8 +272,10 @@ public class IndexIO
case 6:
case 7:
log.info("Old version, re-persisting.");
QueryableIndex segmentToConvert = loadIndex(toConvert);
new IndexMerger(mapper, this).append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))),
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(segmentToConvert)),
null,
converted,
indexSpec
);
@ -1040,17 +1043,20 @@ public class IndexIO
segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
}
Map<String, Object> metadata = null;
Metadata metadata = null;
ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
if (metadataBB != null) {
try {
metadata = mapper.readValue(
serializerUtils.readBytes(metadataBB, metadataBB.remaining()),
new TypeReference<Map<String, Object>>()
{
}
Metadata.class
);
}
catch (JsonParseException | JsonMappingException ex) {
// Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which
// is no longer supported then it is OK to not use the metadata instead of failing segment loading
log.warn(ex, "Failed to load metadata for segment [%s]", inDir);
}
catch (IOException ex) {
throw new IOException("Failed to read metadata", ex);
}

View File

@ -125,11 +125,10 @@ public class IndexMerger
public File persist(
final IncrementalIndex index,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec);
return persist(index, index.getInterval(), outDir, indexSpec);
}
/**
@ -148,18 +147,16 @@ public class IndexMerger
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return persist(index, dataInterval, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
}
public File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -197,14 +194,16 @@ public class IndexMerger
),
index.getMetricAggs(),
outDir,
segmentMetadata,
indexSpec,
progress
);
}
public File mergeQueryableIndex(
List<QueryableIndex> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec
List<QueryableIndex> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec
) throws IOException
{
return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
@ -238,7 +237,6 @@ public class IndexMerger
indexAdapteres,
metricAggs,
outDir,
null,
indexSpec,
progress
);
@ -248,11 +246,10 @@ public class IndexMerger
List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
}
private static List<String> getLexicographicMergedDimensions(List<IndexableAdapter> indexes)
@ -291,7 +288,6 @@ public class IndexMerger
List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -371,11 +367,11 @@ public class IndexMerger
return makeIndexFiles(
indexes,
sortedMetricAggs,
outDir,
progress,
mergedDimensions,
mergedMetrics,
segmentMetadata,
rowMergerFn,
indexSpec
);
@ -395,11 +391,11 @@ public class IndexMerger
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
return makeIndexFiles(
ImmutableList.of(adapter),
null,
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
null,
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@ -414,15 +410,20 @@ public class IndexMerger
}
}
public File append(
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec
List<IndexableAdapter> indexes, AggregatorFactory[] aggregators, File outDir, IndexSpec indexSpec
) throws IOException
{
return append(indexes, outDir, indexSpec, new BaseProgressIndicator());
return append(indexes, aggregators, outDir, indexSpec, new BaseProgressIndicator());
}
public File append(
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec, ProgressIndicator progress
List<IndexableAdapter> indexes,
AggregatorFactory[] aggregators,
File outDir,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
{
FileUtils.deleteDirectory(outDir);
@ -470,20 +471,59 @@ public class IndexMerger
}
};
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec);
return makeIndexFiles(
indexes,
aggregators,
outDir,
progress,
mergedDimensions,
mergedMetrics,
rowMergerFn,
indexSpec
);
}
protected File makeIndexFiles(
final List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
final File outDir,
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Map<String, Object> segmentMetadata,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
{
List<Metadata> metadataList = Lists.transform(
indexes,
new Function<IndexableAdapter, Metadata>()
{
@Nullable
@Override
public Metadata apply(IndexableAdapter input)
{
return input.getMetadata();
}
}
);
Metadata segmentMetadata = null;
if (metricAggs != null) {
AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[metricAggs.length];
for (int i = 0; i < metricAggs.length; i++) {
combiningMetricAggs[i] = metricAggs[i].getCombiningFactory();
}
segmentMetadata = Metadata.merge(
metadataList,
combiningMetricAggs
);
} else {
segmentMetadata = Metadata.merge(
metadataList,
null
);
}
final Map<String, ValueType> valueTypes = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final Map<String, String> metricTypeNames = Maps.newTreeMap(Ordering.<String>natural().nullsFirst());
final Map<String, ColumnCapabilitiesImpl> columnCapabilities = Maps.newHashMap();
@ -921,7 +961,7 @@ public class IndexMerger
)
);
if (segmentMetadata != null && !segmentMetadata.isEmpty()) {
if (segmentMetadata != null) {
writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata);
log.info("wrote metadata.drd in outDir[%s].", v8OutDir);
@ -1297,7 +1337,7 @@ public class IndexMerger
return true;
}
private void writeMetadataToFile(File metadataFile, Map<String, Object> metadata) throws IOException
private void writeMetadataToFile(File metadataFile, Metadata metadata) throws IOException
{
try (FileOutputStream metadataFileOutputStream = new FileOutputStream(metadataFile);
FileChannel metadataFilechannel = metadataFileOutputStream.getChannel()

View File

@ -21,9 +21,6 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -39,16 +36,12 @@ import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable;
import com.metamx.common.io.smoosh.FileSmoosher;
import com.metamx.common.io.smoosh.SmooshedWriter;
import com.metamx.common.logger.Logger;
import io.druid.collections.CombiningIterable;
import io.druid.common.utils.JodaUtils;
import io.druid.common.utils.SerializerUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.Column;
@ -72,8 +65,6 @@ import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.data.VSizeIndexedIntsWriter;
import io.druid.segment.data.VSizeIndexedWriter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnSerializer;
import io.druid.segment.serde.ComplexMetricSerde;
@ -116,11 +107,11 @@ public class IndexMergerV9 extends IndexMerger
@Override
protected File makeIndexFiles(
final List<IndexableAdapter> adapters,
final AggregatorFactory[] metricAggs,
final File outDir,
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Map<String, Object> segmentMetadata,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
@ -128,6 +119,36 @@ public class IndexMergerV9 extends IndexMerger
progress.start();
progress.progress();
List<Metadata> metadataList = Lists.transform(
adapters,
new Function<IndexableAdapter, Metadata>()
{
@Nullable
@Override
public Metadata apply(IndexableAdapter input)
{
return input.getMetadata();
}
}
);
Metadata segmentMetadata = null;
if (metricAggs != null) {
AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[metricAggs.length];
for (int i = 0; i < metricAggs.length; i++) {
combiningMetricAggs[i] = metricAggs[i].getCombiningFactory();
}
segmentMetadata = Metadata.merge(
metadataList,
combiningMetricAggs
);
} else {
segmentMetadata = Metadata.merge(
metadataList,
null
);
}
final IOPeon ioPeon = new TmpFileIOPeon(false);
final FileSmoosher v9Smoosher = new FileSmoosher(outDir);
final File v9TmpDir = new File(outDir, "v9-tmp");
@ -221,10 +242,10 @@ public class IndexMergerV9 extends IndexMerger
private void makeMetadataBinary(
final FileSmoosher v9Smoosher,
final ProgressIndicator progress,
final Map<String, Object> segmentMetadata
final Metadata segmentMetadata
) throws IOException
{
if (segmentMetadata != null && !segmentMetadata.isEmpty()) {
if (segmentMetadata != null) {
progress.startSection("make metadata.drd");
v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(segmentMetadata)));
progress.stopSection("make metadata.drd");

View File

@ -49,4 +49,6 @@ public interface IndexableAdapter
String getMetricType(String metric);
ColumnCapabilities getCapabilities(String column);
Metadata getMetadata();
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.aggregation.AggregatorFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*/
public class Metadata
{
// container is used for arbitrary key-value pairs in segment metadata e.g.
// kafka firehose uses it to store commit offset
@JsonProperty
private final Map<String, Object> container;
@JsonProperty
private AggregatorFactory[] aggregators;
public Metadata()
{
container = new ConcurrentHashMap<>();
}
public AggregatorFactory[] getAggregators()
{
return aggregators;
}
public Metadata setAggregators(AggregatorFactory[] aggregators)
{
this.aggregators = aggregators;
return this;
}
public Metadata putAll(Map<String, Object> other)
{
if (other != null) {
container.putAll(other);
}
return this;
}
public Object get(String key)
{
return container.get(key);
}
public Metadata put(String key, Object value)
{
if (value != null) {
container.put(key, value);
}
return this;
}
// arbitrary key-value pairs from the metadata just follow the semantics of last one wins if same
// key exists in multiple input Metadata containers
// for others e.g. Aggregators, appropriate merging is done
public static Metadata merge(
List<Metadata> toBeMerged,
AggregatorFactory[] overrideMergedAggregators
)
{
if (toBeMerged == null || toBeMerged.size() == 0) {
return null;
}
boolean foundSomeMetadata = false;
Map<String, Object> mergedContainer = new HashMap<>();
List<AggregatorFactory[]> aggregatorsToMerge = overrideMergedAggregators == null
? new ArrayList<AggregatorFactory[]>()
: null;
for (Metadata metadata : toBeMerged) {
if (metadata != null) {
foundSomeMetadata = true;
if (aggregatorsToMerge != null) {
aggregatorsToMerge.add(metadata.getAggregators());
}
mergedContainer.putAll(metadata.container);
} else {
//if metadata and hence aggregators for some segment being merged are unknown then
//final merged segment should not have aggregators in the metadata
aggregatorsToMerge = null;
}
}
if(!foundSomeMetadata) {
return null;
}
Metadata result = new Metadata();
if (aggregatorsToMerge != null) {
result.setAggregators(AggregatorFactory.mergeAggregators(aggregatorsToMerge));
} else {
result.setAggregators(overrideMergedAggregators);
}
result.container.putAll(mergedContainer);
return result;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Metadata metadata = (Metadata) o;
if (!container.equals(metadata.container)) {
return false;
}
// Probably incorrect - comparing Object[] arrays with Arrays.equals
return Arrays.equals(aggregators, metadata.aggregators);
}
@Override
public int hashCode()
{
int result = container.hashCode();
result = 31 * result + (aggregators != null ? Arrays.hashCode(aggregators) : 0);
return result;
}
@Override
public String toString()
{
return "Metadata{" +
"container=" + container +
", aggregators=" + Arrays.toString(aggregators) +
'}';
}
}

View File

@ -25,7 +25,6 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
/**
*/
@ -36,7 +35,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable
public Indexed<String> getColumnNames();
public Indexed<String> getAvailableDimensions();
public BitmapFactory getBitmapFactoryForDimensions();
public Map<String, Object> getMetaData();
public Metadata getMetadata();
/**
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.

View File

@ -62,6 +62,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
private final int numRows;
private final QueryableIndex input;
private final List<String> availableDimensions;
private final Metadata metadata;
public QueryableIndexIndexableAdapter(QueryableIndex input)
{
@ -83,6 +84,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
log.info("No dictionary on dimension[%s]", dim);
}
}
this.metadata = input.getMetadata();
}
@Override
@ -379,8 +382,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
}
if (lastVal != null) {
if (GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) {
throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.",
value, lastVal);
throw new ISE(
"Value[%s] is less than the last value[%s] I have, cannot be.",
value, lastVal
);
}
return new EmptyIndexedInts();
}
@ -398,12 +403,20 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
}
return ret;
} else if (compareResult < 0) {
throw new ISE("Skipped currValue[%s], currIndex[%,d]; incoming value[%s]",
currVal, currIndex, value);
throw new ISE(
"Skipped currValue[%s], currIndex[%,d]; incoming value[%s]",
currVal, currIndex, value
);
} else {
return new EmptyIndexedInts();
}
}
};
}
@Override
public Metadata getMetadata()
{
return metadata;
}
}

View File

@ -99,4 +99,10 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter
{
return baseAdapter.getBitmapIndexSeeker(dimension);
}
@Override
public Metadata getMetadata()
{
return baseAdapter.getMetadata();
}
}

View File

@ -39,7 +39,7 @@ public class SimpleQueryableIndex implements QueryableIndex
private final BitmapFactory bitmapFactory;
private final Map<String, Column> columns;
private final SmooshedFileMapper fileMapper;
private final Map<String, Object> metadata;
private final Metadata metadata;
public SimpleQueryableIndex(
Interval dataInterval,
@ -48,7 +48,7 @@ public class SimpleQueryableIndex implements QueryableIndex
BitmapFactory bitmapFactory,
Map<String, Column> columns,
SmooshedFileMapper fileMapper,
Map<String, Object> metadata
Metadata metadata
)
{
Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME));
@ -104,7 +104,7 @@ public class SimpleQueryableIndex implements QueryableIndex
}
@Override
public Map<String, Object> getMetaData()
public Metadata getMetadata()
{
return metadata;
}

View File

@ -44,6 +44,7 @@ import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.Metadata;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -262,6 +263,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final Metadata metadata;
private final Map<String, MetricDesc> metricDescs;
private final Map<String, DimensionDesc> dimensionDescs;
@ -299,6 +301,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
this.columnCapabilities = Maps.newHashMap();
@ -621,6 +625,20 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
return getFacts().subMap(start, end);
}
public Metadata getMetadata()
{
return metadata;
}
private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)
{
AggregatorFactory[] combiningAggregators = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
combiningAggregators[i] = aggregators[i].getCombiningFactory();
}
return combiningAggregators;
}
@Override
public Iterator<Row> iterator()
{

View File

@ -29,6 +29,7 @@ import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.Metadata;
import io.druid.segment.Rowboat;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities;
@ -58,6 +59,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
private final IncrementalIndex<?> index;
private final Map<String, Map<String, MutableBitmap>> invertedIndexes;
private final Set<String> hasNullValueDimensions;
private final Metadata metadata;
public IncrementalIndexAdapter(
Interval dataInterval, IncrementalIndex<?> index, BitmapFactory bitmapFactory
@ -65,6 +67,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{
this.dataInterval = dataInterval;
this.index = index;
this.metadata = index.getMetadata();
this.invertedIndexes = Maps.newHashMap();
/* Sometimes it's hard to tell whether one dimension contains a null value or not.
* If one dimension had show a null or empty value explicitly, then yes, it contains
@ -398,4 +402,10 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
}
@Override
public Metadata getMetadata()
{
return metadata;
}
}

View File

@ -123,7 +123,7 @@ public class MultiValuedDimensionTest
persistedSegmentDir = Files.createTempDir();
TestHelper.getTestIndexMerger()
.persist(incrementalIndex, persistedSegmentDir, ImmutableMap.<String, Object>of(), new IndexSpec());
.persist(incrementalIndex, persistedSegmentDir, new IndexSpec());
queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);
}

View File

@ -326,7 +326,7 @@ public class AggregationTestHelper
catch (IndexSizeExceededException ex) {
File tmp = tempFolder.newFolder();
toMerge.add(tmp);
indexMerger.persist(index, tmp, null, new IndexSpec());
indexMerger.persist(index, tmp, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount);
}
@ -335,7 +335,7 @@ public class AggregationTestHelper
if (toMerge.size() > 0) {
File tmp = tempFolder.newFolder();
toMerge.add(tmp);
indexMerger.persist(index, tmp, null, new IndexSpec());
indexMerger.persist(index, tmp, new IndexSpec());
List<QueryableIndex> indexes = new ArrayList<>(toMerge.size());
for (File file : toMerge) {
@ -347,7 +347,7 @@ public class AggregationTestHelper
qi.close();
}
} else {
indexMerger.persist(index, outDir, null, new IndexSpec());
indexMerger.persist(index, outDir, new IndexSpec());
}
}
finally {

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class AggregatorFactoryTest
{
@Test
public void testMergeAggregators()
{
Assert.assertNull(AggregatorFactory.mergeAggregators(null));
Assert.assertNull(AggregatorFactory.mergeAggregators(ImmutableList.<AggregatorFactory[]>of()));
List<AggregatorFactory[]> aggregatorsToBeMerged = new ArrayList<>();
aggregatorsToBeMerged.add(null);
Assert.assertNull(AggregatorFactory.mergeAggregators(aggregatorsToBeMerged));
AggregatorFactory[] emptyAggFactory = new AggregatorFactory[0];
aggregatorsToBeMerged.clear();
aggregatorsToBeMerged.add(emptyAggFactory);
Assert.assertArrayEquals(emptyAggFactory, AggregatorFactory.mergeAggregators(aggregatorsToBeMerged));
aggregatorsToBeMerged.clear();
aggregatorsToBeMerged.add(emptyAggFactory);
aggregatorsToBeMerged.add(null);
Assert.assertNull(AggregatorFactory.mergeAggregators(aggregatorsToBeMerged));
aggregatorsToBeMerged.clear();
AggregatorFactory[] af1 = new AggregatorFactory[]{
new LongMaxAggregatorFactory("name", "fieldName1")
};
AggregatorFactory[] af2 = new AggregatorFactory[]{
new LongMaxAggregatorFactory("name", "fieldName2")
};
Assert.assertArrayEquals(
new AggregatorFactory[]{
new LongMaxAggregatorFactory("name", "name")
},
AggregatorFactory.mergeAggregators(ImmutableList.of(af1, af2))
);
aggregatorsToBeMerged.clear();
af1 = new AggregatorFactory[]{
new LongMaxAggregatorFactory("name", "fieldName1")
};
af2 = new AggregatorFactory[]{
new DoubleMaxAggregatorFactory("name", "fieldName2")
};
Assert.assertNull(AggregatorFactory.mergeAggregators(ImmutableList.of(af1, af2))
);
}
}

View File

@ -63,7 +63,6 @@ public class EmptyIndexTest
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
new AggregatorFactory[0],
tmpDir,
null,
new IndexSpec()
);

View File

@ -142,7 +142,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
@ -153,6 +152,11 @@ public class IndexMergerTest
Assert.assertEquals(3, index.getColumnNames().size());
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
Assert.assertArrayEquals(
IncrementalIndexTest.getDefaultCombiningAggregatorFactories(),
index.getMetadata().getAggregators()
);
}
@Test
@ -180,7 +184,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
@ -210,6 +213,7 @@ public class IndexMergerTest
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim2", "2"));
}
@Test
public void testMergeRetainsValues() throws Exception
{
@ -231,7 +235,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -246,12 +249,12 @@ public class IndexMergerTest
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")};
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedAggregators,
mergedDir,
indexSpec
)
@ -276,7 +279,8 @@ public class IndexMergerTest
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
Map<String, Object> metadataElems = ImmutableMap.<String, Object>of("key", "value");
toPersist.getMetadata().putAll(metadataElems);
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(
@ -284,7 +288,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist,
tempDir,
segmentMetadata,
indexSpec
)
)
@ -296,7 +299,14 @@ public class IndexMergerTest
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
Assert.assertEquals(segmentMetadata, index.getMetaData());
Assert.assertEquals(
new Metadata()
.setAggregators(
IncrementalIndexTest.getDefaultCombiningAggregatorFactories()
)
.putAll(metadataElems),
index.getMetadata()
);
}
@Test
@ -338,7 +348,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -353,7 +362,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist2,
tempDir2,
null,
indexSpec
)
)
@ -363,11 +371,14 @@ public class IndexMergerTest
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(3, index2.getColumnNames().size());
AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{
new CountAggregatorFactory("count")
};
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedAggregators,
mergedDir,
indexSpec
)
@ -380,6 +391,11 @@ public class IndexMergerTest
assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
Assert.assertArrayEquals(
getCombiningAggregators(mergedAggregators),
merged.getMetadata().getAggregators()
);
}
@Test
@ -422,7 +438,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tmpDir1,
null,
indexSpec
)
)
@ -432,7 +447,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tmpDir2,
null,
indexSpec
)
)
@ -479,9 +493,9 @@ public class IndexMergerTest
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
INDEX_IO.loadIndex(
INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(incrementalAdapter), null, tempDir1, indexSpec
)
)
);
@ -493,12 +507,17 @@ public class IndexMergerTest
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
Assert.assertArrayEquals(
IncrementalIndexTest.getDefaultCombiningAggregatorFactories(),
index1.getMetadata().getAggregators()
);
AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")};
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedAggregators,
mergedDir,
indexSpec
)
@ -513,6 +532,11 @@ public class IndexMergerTest
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy());
Assert.assertArrayEquals(
getCombiningAggregators(mergedAggregators),
merged.getMetadata().getAggregators()
);
}
@Test
@ -536,7 +560,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -551,19 +574,18 @@ public class IndexMergerTest
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
IndexSpec newSpec = new IndexSpec(
indexSpec.getBitmapSerdeFactory(),
"lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4",
"lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4"
);
AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")};
QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
ImmutableList.of(index1),
new AggregatorFactory[]{new CountAggregatorFactory("count")},
mergedAggregators,
mergedDir,
newSpec
)
@ -585,15 +607,15 @@ public class IndexMergerTest
public void testConvertSame() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(
new AggregatorFactory[]{
new LongSumAggregatorFactory(
"longSum1",
"dim1"
),
new LongSumAggregatorFactory("longSum2", "dim2")
}
);
final AggregatorFactory[] aggregators = new AggregatorFactory[]{
new LongSumAggregatorFactory(
"longSum1",
"dim1"
),
new LongSumAggregatorFactory("longSum2", "dim2")
};
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(aggregators);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
@ -606,7 +628,7 @@ public class IndexMergerTest
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, null, indexSpec))
INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, indexSpec))
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -636,6 +658,11 @@ public class IndexMergerTest
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy());
Assert.assertArrayEquals(
getCombiningAggregators(aggregators),
converted.getMetadata().getAggregators()
);
}
@ -643,15 +670,15 @@ public class IndexMergerTest
public void testConvertDifferent() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(
new AggregatorFactory[]{
new LongSumAggregatorFactory(
"longSum1",
"dim1"
),
new LongSumAggregatorFactory("longSum2", "dim2")
}
);
final AggregatorFactory[] aggregators = new AggregatorFactory[]{
new LongSumAggregatorFactory(
"longSum1",
"dim1"
),
new LongSumAggregatorFactory("longSum2", "dim2")
};
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(aggregators);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder();
@ -668,7 +695,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -708,6 +734,11 @@ public class IndexMergerTest
assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy());
assertDimCompression(converted, newSpec.getDimensionCompressionStrategy());
Assert.assertArrayEquals(
getCombiningAggregators(aggregators),
converted.getMetadata().getAggregators()
);
}
private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy)
@ -736,7 +767,6 @@ public class IndexMergerTest
Assert.assertEquals(expectedStrategy, strategy);
}
@Test
public void testNonLexicographicDimOrderMerge() throws Exception
{
@ -754,7 +784,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist1,
tmpDir,
null,
indexSpec
)
)
@ -765,7 +794,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist2,
tmpDir2,
null,
indexSpec
)
)
@ -776,7 +804,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersist3,
tmpDir3,
null,
indexSpec
)
)
@ -826,7 +853,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
null,
indexSpec
)
)
@ -837,7 +863,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
null,
indexSpec
)
)
@ -954,7 +979,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
null,
indexSpec
)
)
@ -965,7 +989,6 @@ public class IndexMergerTest
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
null,
indexSpec
)
)
@ -1097,4 +1120,13 @@ public class IndexMergerTest
return toPersist1;
}
private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)
{
AggregatorFactory[] combiningAggregators = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; i++) {
combiningAggregators[i] = aggregators[i].getCombiningFactory();
}
return combiningAggregators;
}
}

View File

@ -170,12 +170,13 @@ public class IndexMergerV9CompatibilityTest
DEFAULT_AGG_FACTORIES,
1000000
);
toPersist.getMetadata().put("key", "value");
for (InputRow event : events) {
toPersist.add(event);
}
tmpDir = Files.createTempDir();
persistTmpDir = new File(tmpDir, "persistDir");
INDEX_MERGER.persist(toPersist, persistTmpDir, null, INDEX_SPEC);
INDEX_MERGER.persist(toPersist, persistTmpDir, INDEX_SPEC);
}
@After
@ -191,10 +192,9 @@ public class IndexMergerV9CompatibilityTest
QueryableIndex index = null;
try {
outDir = Files.createTempDir();
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
index = INDEX_IO.loadIndex(INDEX_MERGER_V9.persist(toPersist, outDir, segmentMetadata, INDEX_SPEC));
index = INDEX_IO.loadIndex(INDEX_MERGER_V9.persist(toPersist, outDir, INDEX_SPEC));
Assert.assertEquals(segmentMetadata, index.getMetaData());
Assert.assertEquals("value", index.getMetadata().get("key"));
}
finally {
if (index != null) {
@ -237,6 +237,7 @@ public class IndexMergerV9CompatibilityTest
{
final File outDir = INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(inDir)))),
null,
tmpDir,
INDEX_SPEC
);

View File

@ -57,7 +57,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class IndexMergerV9Test
@ -142,7 +141,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
@ -180,7 +178,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
@ -217,8 +214,7 @@ public class IndexMergerV9Test
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
toPersist.getMetadata().put("key", "value");
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(
@ -226,7 +222,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist,
tempDir,
segmentMetadata,
indexSpec
)
)
@ -238,7 +233,7 @@ public class IndexMergerV9Test
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
Assert.assertEquals(segmentMetadata, index.getMetaData());
Assert.assertEquals("value", index.getMetadata().get("key"));
}
@Test
@ -280,7 +275,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -295,7 +289,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist2,
tempDir2,
null,
indexSpec
)
)
@ -364,7 +357,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist1,
tmpDir1,
null,
indexSpec
)
)
@ -374,7 +366,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist1,
tmpDir2,
null,
indexSpec
)
)
@ -425,7 +416,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -482,7 +472,7 @@ public class IndexMergerV9Test
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.append(
ImmutableList.<IndexableAdapter>of(incrementalAdapter), tempDir1, indexSpec
ImmutableList.<IndexableAdapter>of(incrementalAdapter), null, tempDir1, indexSpec
)
)
);
@ -537,7 +527,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -607,7 +596,7 @@ public class IndexMergerV9Test
);
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, null, indexSpec))
INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, indexSpec))
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -669,7 +658,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
@ -751,7 +739,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
null,
indexSpec
)
)
@ -762,7 +749,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
null,
indexSpec
)
)
@ -879,7 +865,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersistA,
tmpDirA,
null,
indexSpec
)
)
@ -890,7 +875,6 @@ public class IndexMergerV9Test
INDEX_MERGER.persist(
toPersistB,
tmpDirB,
null,
indexSpec
)
)

View File

@ -258,7 +258,7 @@ public class IndexMergerV9WithSpatialIndexTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
INDEX_MERGER_V9.persist(theIndex, tmpFile, null, indexSpec);
INDEX_MERGER_V9.persist(theIndex, tmpFile, indexSpec);
return INDEX_IO.loadIndex(tmpFile);
}
@ -478,9 +478,9 @@ public class IndexMergerV9WithSpatialIndexTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
INDEX_MERGER_V9.persist(first, DATA_INTERVAL, firstFile, null, indexSpec);
INDEX_MERGER_V9.persist(second, DATA_INTERVAL, secondFile, null, indexSpec);
INDEX_MERGER_V9.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec);
INDEX_MERGER_V9.persist(first, DATA_INTERVAL, firstFile, indexSpec);
INDEX_MERGER_V9.persist(second, DATA_INTERVAL, secondFile, indexSpec);
INDEX_MERGER_V9.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
QueryableIndex mergedRealtime = INDEX_IO.loadIndex(
INDEX_MERGER_V9.mergeQueryableIndex(

View File

@ -0,0 +1,118 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class MetadataTest
{
@Test
public void testSerde() throws Exception
{
ObjectMapper jsonMapper = new DefaultObjectMapper();
Metadata metadata = new Metadata();
metadata.put("k", "v");
AggregatorFactory[] aggregators = new AggregatorFactory[]
{
new LongSumAggregatorFactory("out", "in")
};
metadata.setAggregators(aggregators);
Metadata other = jsonMapper.readValue(
jsonMapper.writeValueAsString(metadata),
Metadata.class
);
Assert.assertEquals(metadata, other);
}
@Test
public void testMerge()
{
Assert.assertNull(Metadata.merge(null, null));
Assert.assertNull(Metadata.merge(ImmutableList.<Metadata>of(), null));
List<Metadata> metadataToBeMerged = new ArrayList<>();
metadataToBeMerged.add(null);
Assert.assertNull(Metadata.merge(metadataToBeMerged, null));
//sanity merge check
AggregatorFactory[] aggs = new AggregatorFactory[] {
new LongMaxAggregatorFactory("n", "f")
};
Metadata m1 = new Metadata();
m1.put("k", "v");
m1.setAggregators(aggs);
Metadata m2 = new Metadata();
m2.put("k", "v");
m2.setAggregators(aggs);
Metadata merged = new Metadata();
merged.put("k", "v");
merged.setAggregators(
new AggregatorFactory[]{
new LongMaxAggregatorFactory("n", "n")
}
);
Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null));
//merge check with one metadata being null
metadataToBeMerged.clear();
metadataToBeMerged.add(m1);
metadataToBeMerged.add(m2);
metadataToBeMerged.add(null);
merged.setAggregators(null);
Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null));
//merge check with client explicitly providing merged aggregators
AggregatorFactory[] explicitAggs = new AggregatorFactory[] {
new DoubleMaxAggregatorFactory("x", "y")
};
merged.setAggregators(explicitAggs);
Assert.assertEquals(
merged,
Metadata.merge(metadataToBeMerged, explicitAggs)
);
Assert.assertEquals(
merged,
Metadata.merge(ImmutableList.of(m1, m2), explicitAggs)
);
}
}

View File

@ -19,8 +19,7 @@
package io.druid.segment;
import java.io.File;
import com.metamx.common.ISE;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
@ -28,13 +27,12 @@ import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import com.metamx.common.ISE;
import java.io.File;
public class QueryableIndexIndexableAdapterTest {
private final static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
@ -69,7 +67,6 @@ public class QueryableIndexIndexableAdapterTest {
INDEX_MERGER.persist(
toPersist,
tempDir,
null,
INDEX_SPEC
)
)

View File

@ -191,8 +191,8 @@ public class SchemalessIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
INDEX_MERGER.persist(top, topFile, null, indexSpec);
INDEX_MERGER.persist(bottom, bottomFile, null, indexSpec);
INDEX_MERGER.persist(top, topFile, indexSpec);
INDEX_MERGER.persist(bottom, bottomFile, indexSpec);
mergedIndex = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
@ -361,7 +361,7 @@ public class SchemalessIndex
tmpFile.mkdirs();
tmpFile.deleteOnExit();
INDEX_MERGER.persist(rowIndex, tmpFile, null, indexSpec);
INDEX_MERGER.persist(rowIndex, tmpFile, indexSpec);
rowPersistedIndexes.add(INDEX_IO.loadIndex(tmpFile));
}
}
@ -421,7 +421,7 @@ public class SchemalessIndex
theFile.mkdirs();
theFile.deleteOnExit();
filesToMap.add(theFile);
INDEX_MERGER.persist(index, theFile, null, indexSpec);
INDEX_MERGER.persist(index, theFile, indexSpec);
}
return filesToMap;
@ -495,7 +495,7 @@ public class SchemalessIndex
)
);
return INDEX_IO.loadIndex(INDEX_MERGER.append(adapters, mergedFile, indexSpec));
return INDEX_IO.loadIndex(INDEX_MERGER.append(adapters, null, mergedFile, indexSpec));
}
catch (IOException e) {
throw Throwables.propagate(e);

View File

@ -38,9 +38,10 @@ public class TestHelper
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
public static final ObjectMapper JSON_MAPPER;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
@ -56,6 +57,11 @@ public class TestHelper
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}
public static ObjectMapper getTestObjectMapper()
{
return JSON_MAPPER;
}
public static IndexMerger getTestIndexMerger()
{

View File

@ -143,8 +143,8 @@ public class TestIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
INDEX_MERGER.persist(top, DATA_INTERVAL, topFile, null, indexSpec);
INDEX_MERGER.persist(bottom, DATA_INTERVAL, bottomFile, null, indexSpec);
INDEX_MERGER.persist(top, DATA_INTERVAL, topFile, indexSpec);
INDEX_MERGER.persist(bottom, DATA_INTERVAL, bottomFile, indexSpec);
mergedRealtime = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
@ -243,7 +243,7 @@ public class TestIndex
someTmpFile.mkdirs();
someTmpFile.deleteOnExit();
INDEX_MERGER.persist(index, someTmpFile, null, indexSpec);
INDEX_MERGER.persist(index, someTmpFile, indexSpec);
return INDEX_IO.loadIndex(someTmpFile);
}
catch (IOException e) {

View File

@ -128,6 +128,16 @@ public class IncrementalIndexTest
);
}
public static AggregatorFactory[] getDefaultAggregatorFactories()
{
return defaultAggregatorFactories;
}
public static AggregatorFactory[] getDefaultCombiningAggregatorFactories()
{
return defaultCombiningAggregatorFactories;
}
public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
{
if (null == aggregatorFactories) {
@ -188,6 +198,10 @@ public class IncrementalIndexTest
)
};
private static final AggregatorFactory[] defaultCombiningAggregatorFactories = new AggregatorFactory[]{
defaultAggregatorFactories[0].getCombiningFactory()
};
@Test
public void testCaseSensitivity() throws Exception
{

View File

@ -237,7 +237,7 @@ public class SpatialFilterBonusTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
INDEX_MERGER.persist(theIndex, tmpFile, null, indexSpec);
INDEX_MERGER.persist(theIndex, tmpFile, indexSpec);
return INDEX_IO.loadIndex(tmpFile);
}
@ -417,9 +417,9 @@ public class SpatialFilterBonusTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, null, indexSpec);
INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, null, indexSpec);
INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec);
INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, indexSpec);
INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, indexSpec);
INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
QueryableIndex mergedRealtime = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(

View File

@ -266,7 +266,7 @@ public class SpatialFilterTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
INDEX_MERGER.persist(theIndex, tmpFile, null, indexSpec);
INDEX_MERGER.persist(theIndex, tmpFile, indexSpec);
return INDEX_IO.loadIndex(tmpFile);
}
@ -486,9 +486,9 @@ public class SpatialFilterTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, null, indexSpec);
INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, null, indexSpec);
INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec);
INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, indexSpec);
INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, indexSpec);
INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
QueryableIndex mergedRealtime = INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(

View File

@ -63,6 +63,7 @@ import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.Metadata;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
@ -86,6 +87,7 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.ws.rs.HEAD;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
@ -403,7 +405,7 @@ public class RealtimePlumber implements Plumber
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
final Map<String, Object> metadata = committer.getMetadata() == null ? null :
final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
@ -447,7 +449,7 @@ public class RealtimePlumber implements Plumber
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(
persistHydrant(
pair.lhs, schema, pair.rhs, metadata
pair.lhs, schema, pair.rhs, metadataElems
)
);
}
@ -767,7 +769,7 @@ public class RealtimePlumber implements Plumber
//at some point.
continue;
}
Map<String, Object> segmentMetadata = queryableIndex.getMetaData();
Metadata segmentMetadata = queryableIndex.getMetadata();
if (segmentMetadata != null) {
Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY);
if (timestampObj != null) {
@ -775,10 +777,10 @@ public class RealtimePlumber implements Plumber
if (timestamp > latestCommitTime) {
log.info(
"Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]",
queryableIndex.getMetaData(), timestamp, latestCommitTime
queryableIndex.getMetadata(), timestamp, latestCommitTime
);
latestCommitTime = timestamp;
metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY);
metadata = queryableIndex.getMetadata().get(COMMIT_METADATA_KEY);
}
}
}
@ -1000,7 +1002,7 @@ public class RealtimePlumber implements Plumber
FireHydrant indexToPersist,
DataSchema schema,
Interval interval,
Map<String, Object> metaData
Map<String, Object> metadataElems
)
{
synchronized (indexToPersist) {
@ -1016,7 +1018,7 @@ public class RealtimePlumber implements Plumber
"DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]",
schema.getDataSource(),
interval,
metaData,
metadataElems,
indexToPersist
);
try {
@ -1024,11 +1026,11 @@ public class RealtimePlumber implements Plumber
final IndexSpec indexSpec = config.getIndexSpec();
indexToPersist.getIndex().getMetadata().putAll(metadataElems);
final File persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
interval,
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec
);

View File

@ -124,7 +124,7 @@ public class IngestSegmentFirehoseTest
for (String line : rows) {
index.add(parser.parse(line));
}
indexMerger.persist(index, segmentDir, null, new IndexSpec());
indexMerger.persist(index, segmentDir, new IndexSpec());
}
finally {
if (index != null) {