diff --git a/docs/content/misc/tasks.md b/docs/content/misc/tasks.md index 5b921de5829..cc9788218c6 100644 --- a/docs/content/misc/tasks.md +++ b/docs/content/misc/tasks.md @@ -284,7 +284,8 @@ Append tasks append a list of segments together into a single segment (one after "type": "append", "id": , "dataSource": , - "segments": + "segments": , + "aggregations": } ``` diff --git a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 9b5597a90ea..1e1047d9c7d 100644 --- a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -41,7 +41,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; -public abstract class SketchAggregatorFactory implements AggregatorFactory +public abstract class SketchAggregatorFactory extends AggregatorFactory { public static final int DEFAULT_MAX_SKETCH_SIZE = 16384; diff --git a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index b6fde154f15..69e869f121f 100644 --- a/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.yahoo.sketches.theta.Sketch; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import java.util.Collections; import java.util.List; @@ -66,7 +67,25 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory @Override public AggregatorFactory getCombiningFactory() { - return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, isInputThetaSketch); + return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, false); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && other instanceof SketchMergeAggregatorFactory) { + SketchMergeAggregatorFactory castedOther = (SketchMergeAggregatorFactory) other; + + return new SketchMergeAggregatorFactory( + name, + name, + Math.max(size, castedOther.size), + shouldFinalize, + true + ); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } } @JsonProperty diff --git a/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java b/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java index 890ba457f1d..019fb23bda2 100644 --- a/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java +++ b/extensions/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregatorFactory.java @@ -29,6 +29,7 @@ import com.google.common.primitives.Ints; import com.metamx.common.StringUtils; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ColumnSelectorFactory; import org.apache.commons.codec.binary.Base64; @@ -39,7 +40,7 @@ import java.util.Comparator; import java.util.List; @JsonTypeName("approxHistogram") -public class ApproximateHistogramAggregatorFactory implements AggregatorFactory +public class ApproximateHistogramAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x8; @@ -116,6 +117,26 @@ public class ApproximateHistogramAggregatorFactory implements AggregatorFactory return new ApproximateHistogramFoldingAggregatorFactory(name, name, resolution, numBuckets, lowerLimit, upperLimit); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && other instanceof ApproximateHistogramAggregatorFactory) { + ApproximateHistogramAggregatorFactory castedOther = (ApproximateHistogramAggregatorFactory) other; + + return new ApproximateHistogramFoldingAggregatorFactory( + name, + name, + Math.max(resolution, castedOther.resolution), + numBuckets, + Math.min(lowerLimit, castedOther.lowerLimit), + Math.max(upperLimit, castedOther.upperLimit) + ); + + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 177fde0635d..5baf2c05ddc 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -469,11 +469,11 @@ public class IndexGeneratorJob implements Jobby { if (config.isBuildV9Directly()) { return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist( - index, interval, file, null, config.getIndexSpec(), progressIndicator + index, interval, file, config.getIndexSpec(), progressIndicator ); } else { return HadoopDruidIndexerConfig.INDEX_MERGER.persist( - index, interval, file, null, config.getIndexSpec(), progressIndicator + index, interval, file, config.getIndexSpec(), progressIndicator ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 07305c54ff5..e95c77ace71 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -222,7 +222,6 @@ public class YeOldePlumberSchool implements PlumberSchool indexMerger.persist( indexToPersist.getIndex(), dirToPersist, - null, config.getIndexSpec() ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java index 53678e616b3..36fd093c160 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import io.druid.indexing.common.TaskToolbox; +import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexSpec; import io.druid.segment.IndexableAdapter; import io.druid.segment.QueryableIndexIndexableAdapter; @@ -50,18 +51,21 @@ public class AppendTask extends MergeTaskBase { private final IndexSpec indexSpec; + private final List aggregators; @JsonCreator public AppendTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, + @JsonProperty("aggregations") List aggregators, @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("context") Map context ) { super(id, dataSource, segments, context); this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.aggregators = aggregators; } @Override @@ -109,7 +113,6 @@ public class AppendTask extends MergeTaskBase ); List adapters = Lists.newArrayList(); - for (final SegmentToMergeHolder holder : segmentsToMerge) { adapters.add( new RowboatFilteringIndexAdapter( @@ -128,7 +131,12 @@ public class AppendTask extends MergeTaskBase ); } - return toolbox.getIndexMerger().append(adapters, outDir, indexSpec); + return toolbox.getIndexMerger().append( + adapters, + aggregators == null ? null : aggregators.toArray(new AggregatorFactory[aggregators.size()]), + outDir, + indexSpec + ); } @Override @@ -137,6 +145,12 @@ public class AppendTask extends MergeTaskBase return "append"; } + @JsonProperty("aggregations") + public List getAggregators() + { + return aggregators; + } + private static class SegmentToMergeHolder { private final DataSegment segment; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index efb2fead456..ffe1b0642f1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -391,6 +391,9 @@ public class TaskSerdeTest null, "foo", segments, + ImmutableList.of( + new CountAggregatorFactory("cnt") + ), indexSpec, null ); @@ -417,6 +420,7 @@ public class TaskSerdeTest Assert.assertEquals("foo", task3.getDataSource()); Assert.assertEquals(new Interval("2010-01-01/P2D"), task3.getInterval()); Assert.assertEquals(task3.getSegments(), segments); + Assert.assertEquals(task.getAggregators(), task2.getAggregators()); } @Test diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 37b1d01df36..cb85e37e5bb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -154,7 +154,7 @@ public class IngestSegmentFirehoseFactoryTest if (!persistDir.mkdirs() && !persistDir.exists()) { throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath())); } - INDEX_MERGER.persist(index, persistDir, null, indexSpec); + INDEX_MERGER.persist(index, persistDir, indexSpec); final TaskLockbox tl = new TaskLockbox(ts); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index bca91545fb6..4baa7accc11 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -230,7 +230,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest } try { - INDEX_MERGER.persist(index, persistDir, null, new IndexSpec()); + INDEX_MERGER.persist(index, persistDir, new IndexSpec()); } catch (IOException e) { throw Throwables.propagate(e); diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java index 81ea7dd185e..f438570cada 100644 --- a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactory.java @@ -19,10 +19,13 @@ package io.druid.query.aggregation; +import com.metamx.common.logger.Logger; import io.druid.segment.ColumnSelectorFactory; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * Processing related interface @@ -34,13 +37,15 @@ import java.util.List; * provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how * the data is actually stored and accessed. */ -public interface AggregatorFactory +public abstract class AggregatorFactory { - public Aggregator factorize(ColumnSelectorFactory metricFactory); + private static final Logger log = new Logger(AggregatorFactory.class); - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory); + public abstract Aggregator factorize(ColumnSelectorFactory metricFactory); - public Comparator getComparator(); + public abstract BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory); + + public abstract Comparator getComparator(); /** * A method that knows how to combine the outputs of the getIntermediate() method from the Aggregators @@ -53,7 +58,7 @@ public interface AggregatorFactory * * @return an object representing the combination of lhs and rhs, this can be a new object or a mutation of the inputs */ - public Object combine(Object lhs, Object rhs); + public abstract Object combine(Object lhs, Object rhs); /** * Returns an AggregatorFactory that can be used to combine the output of aggregators from this factory. This @@ -62,14 +67,26 @@ public interface AggregatorFactory * * @return a new Factory that can be used for operations on top of data output from the current factory. */ - public AggregatorFactory getCombiningFactory(); + public abstract AggregatorFactory getCombiningFactory(); + + /** + * Returns an AggregatorFactory that can be used to merge the output of aggregators from this factory and + * other factory. + * This method is relevant only for AggregatorFactory which can be used at ingestion time. + * + * @return a new Factory that can be used for merging the output of aggregators from this factory and other. + */ + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + throw new UnsupportedOperationException(String.format("[%s] does not implement getMergingFactory(..)", this.getClass().getName())); + } /** * Gets a list of all columns that this AggregatorFactory will scan * * @return AggregatorFactories for the columns to scan of the parent AggregatorFactory */ - public List getRequiredColumns(); + public abstract List getRequiredColumns(); /** * A method that knows how to "deserialize" the object from whatever form it might have been put into @@ -79,7 +96,7 @@ public interface AggregatorFactory * * @return the deserialized object */ - public Object deserialize(Object object); + public abstract Object deserialize(Object object); /** * "Finalizes" the computation of an object. Primarily useful for complex types that have a different mergeable @@ -89,27 +106,76 @@ public interface AggregatorFactory * * @return the finalized value that should be returned for the initial query */ - public Object finalizeComputation(Object object); + public abstract Object finalizeComputation(Object object); - public String getName(); + public abstract String getName(); - public List requiredFields(); + public abstract List requiredFields(); - public byte[] getCacheKey(); + public abstract byte[] getCacheKey(); - public String getTypeName(); + public abstract String getTypeName(); /** * Returns the maximum size that this aggregator will require in bytes for intermediate storage of results. * * @return the maximum number of bytes that an aggregator of this type will require for intermediate result storage. */ - public int getMaxIntermediateSize(); + public abstract int getMaxIntermediateSize(); /** * Returns the starting value for a corresponding aggregator. For example, 0 for sums, - Infinity for max, an empty mogrifier * * @return the starting value for a corresponding aggregator. */ - public Object getAggregatorStartValue(); + public abstract Object getAggregatorStartValue(); + + /** + * Merges the list of AggregatorFactory[] (presumable from metadata of some segments being merged) and + * returns merged AggregatorFactory[] (for the metadata for merged segment). + * Null is returned if it is not possible to do the merging for any of the following reason. + * - one of the element in input list is null i.e. aggregators for one the segments being merged is unknown + * - AggregatorFactory of same name can not be merged if they are not compatible + * + * @param aggregatorsList + * + * @return merged AggregatorFactory[] or Null if merging is not possible. + */ + public static AggregatorFactory[] mergeAggregators(List aggregatorsList) + { + if (aggregatorsList == null || aggregatorsList.isEmpty()) { + return null; + } + + Map mergedAggregators = new LinkedHashMap<>(); + + for (AggregatorFactory[] aggregators : aggregatorsList) { + + if (aggregators != null) { + for (AggregatorFactory aggregator : aggregators) { + String name = aggregator.getName(); + if (mergedAggregators.containsKey(name)) { + AggregatorFactory other = mergedAggregators.get(name); + try { + mergedAggregators.put(name, other.getMergingFactory(aggregator)); + } + catch (AggregatorFactoryNotMergeableException ex) { + log.warn(ex, "failed to merge aggregator factories"); + mergedAggregators = null; + break; + } + } else { + mergedAggregators.put(name, aggregator); + } + } + } else { + mergedAggregators = null; + break; + } + } + + return mergedAggregators == null + ? null + : mergedAggregators.values().toArray(new AggregatorFactory[mergedAggregators.size()]); + } } diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorFactoryNotMergeableException.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactoryNotMergeableException.java new file mode 100644 index 00000000000..a0e4788dafd --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorFactoryNotMergeableException.java @@ -0,0 +1,57 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation; + +/** + */ +public class AggregatorFactoryNotMergeableException extends Exception +{ + public AggregatorFactoryNotMergeableException() + { + } + + public AggregatorFactoryNotMergeableException(String formatText, Object... arguments) + { + super(String.format(formatText, arguments)); + } + + public AggregatorFactoryNotMergeableException(Throwable cause, String formatText, Object... arguments) + { + super(String.format(formatText, arguments), cause); + } + + public AggregatorFactoryNotMergeableException(Throwable cause) + { + super(cause); + } + + public AggregatorFactoryNotMergeableException(AggregatorFactory af1, AggregatorFactory af2) + { + this( + "can't merge [%s : %s] and [%s : %s] , with detailed info [%s] and [%s]", + af1.getName(), + af1.getClass().getName(), + af2.getName(), + af2.getClass().getName(), + af1, + af2 + ); + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java index 4bf2018f3b4..1f12a5d59df 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountAggregatorFactory.java @@ -32,7 +32,7 @@ import java.util.List; /** */ -public class CountAggregatorFactory implements AggregatorFactory +public class CountAggregatorFactory extends AggregatorFactory { private static final byte[] CACHE_KEY = new byte[]{0x0}; private final String name; diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java index 50dd2a49fb4..029bc86afaf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxAggregatorFactory.java @@ -33,7 +33,7 @@ import java.util.List; /** */ -public class DoubleMaxAggregatorFactory implements AggregatorFactory +public class DoubleMaxAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x3; @@ -83,6 +83,16 @@ public class DoubleMaxAggregatorFactory implements AggregatorFactory return new DoubleMaxAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java index af98615a9a9..04f1de0ede3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinAggregatorFactory.java @@ -33,7 +33,7 @@ import java.util.List; /** */ -public class DoubleMinAggregatorFactory implements AggregatorFactory +public class DoubleMinAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x4; @@ -83,6 +83,16 @@ public class DoubleMinAggregatorFactory implements AggregatorFactory return new DoubleMinAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java index 0cf3e5a5ef0..a05d76a1094 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumAggregatorFactory.java @@ -33,7 +33,7 @@ import java.util.List; /** */ -public class DoubleSumAggregatorFactory implements AggregatorFactory +public class DoubleSumAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x2; @@ -86,6 +86,16 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory return new DoubleSumAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index 9fa42c9c31c..8ee215677c3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -30,7 +30,7 @@ import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; -public class FilteredAggregatorFactory implements AggregatorFactory +public class FilteredAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x9; diff --git a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java index 7a7ae283d0d..318158f5151 100644 --- a/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/HistogramAggregatorFactory.java @@ -34,7 +34,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; -public class HistogramAggregatorFactory implements AggregatorFactory +public class HistogramAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x7; diff --git a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java index 5a30c6c8374..e43e190331c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/JavaScriptAggregatorFactory.java @@ -42,7 +42,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Comparator; import java.util.List; -public class JavaScriptAggregatorFactory implements AggregatorFactory +public class JavaScriptAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x6; @@ -137,6 +137,18 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory return new JavaScriptAggregatorFactory(name, Lists.newArrayList(name), fnCombine, fnReset, fnCombine); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && other.getClass() == this.getClass()) { + JavaScriptAggregatorFactory castedOther = (JavaScriptAggregatorFactory) other; + if (this.fnCombine.equals(castedOther.fnCombine) && this.fnReset.equals(castedOther.fnReset)) { + return getCombiningFactory(); + } + } + throw new AggregatorFactoryNotMergeableException(this, other); + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java index 78a7c8c1c81..91f8c11fee3 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMaxAggregatorFactory.java @@ -33,7 +33,7 @@ import java.util.List; /** */ -public class LongMaxAggregatorFactory implements AggregatorFactory +public class LongMaxAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0xA; @@ -83,6 +83,16 @@ public class LongMaxAggregatorFactory implements AggregatorFactory return new LongMaxAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java index 4c31189c1da..82f2111a43a 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongMinAggregatorFactory.java @@ -33,7 +33,7 @@ import java.util.List; /** */ -public class LongMinAggregatorFactory implements AggregatorFactory +public class LongMinAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0xB; @@ -83,6 +83,16 @@ public class LongMinAggregatorFactory implements AggregatorFactory return new LongMinAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java index 17bf61093ca..33b65d2fb61 100644 --- a/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/LongSumAggregatorFactory.java @@ -33,7 +33,7 @@ import java.util.List; /** */ -public class LongSumAggregatorFactory implements AggregatorFactory +public class LongSumAggregatorFactory extends AggregatorFactory { private static final byte CACHE_TYPE_ID = 0x1; @@ -86,6 +86,16 @@ public class LongSumAggregatorFactory implements AggregatorFactory return new LongSumAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 6b177116e03..6531d47b6bf 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.metamx.common.StringUtils; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; @@ -43,7 +44,7 @@ import java.nio.ByteBuffer; import java.util.Comparator; import java.util.List; -public class CardinalityAggregatorFactory implements AggregatorFactory +public class CardinalityAggregatorFactory extends AggregatorFactory { public static Object estimateCardinality(Object object) { @@ -147,6 +148,12 @@ public class CardinalityAggregatorFactory implements AggregatorFactory return new HyperUniquesAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + throw new UnsupportedOperationException("can't merge CardinalityAggregatorFactory"); + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java index 7825de72e03..cc8ff7ba431 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregatorFactory.java @@ -25,6 +25,7 @@ import com.metamx.common.IAE; import com.metamx.common.StringUtils; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ColumnSelectorFactory; @@ -38,7 +39,7 @@ import java.util.List; /** */ -public class HyperUniquesAggregatorFactory implements AggregatorFactory +public class HyperUniquesAggregatorFactory extends AggregatorFactory { public static Object estimateCardinality(Object object) { @@ -139,6 +140,16 @@ public class HyperUniquesAggregatorFactory implements AggregatorFactory return new HyperUniquesAggregatorFactory(name, name); } + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + @Override public List getRequiredColumns() { diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 76a6c310778..d1c128bc038 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -19,7 +19,8 @@ package io.druid.segment; -import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -271,8 +272,10 @@ public class IndexIO case 6: case 7: log.info("Old version, re-persisting."); + QueryableIndex segmentToConvert = loadIndex(toConvert); new IndexMerger(mapper, this).append( - Arrays.asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))), + Arrays.asList(new QueryableIndexIndexableAdapter(segmentToConvert)), + null, converted, indexSpec ); @@ -1040,17 +1043,20 @@ public class IndexIO segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory(); } - Map metadata = null; + Metadata metadata = null; ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd"); if (metadataBB != null) { try { metadata = mapper.readValue( serializerUtils.readBytes(metadataBB, metadataBB.remaining()), - new TypeReference>() - { - } + Metadata.class ); } + catch (JsonParseException | JsonMappingException ex) { + // Any jackson deserialization errors are ignored e.g. if metadata contains some aggregator which + // is no longer supported then it is OK to not use the metadata instead of failing segment loading + log.warn(ex, "Failed to load metadata for segment [%s]", inDir); + } catch (IOException ex) { throw new IOException("Failed to read metadata", ex); } diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 039303eb0e1..7659a07f53d 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -125,11 +125,10 @@ public class IndexMerger public File persist( final IncrementalIndex index, File outDir, - Map segmentMetadata, IndexSpec indexSpec ) throws IOException { - return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec); + return persist(index, index.getInterval(), outDir, indexSpec); } /** @@ -148,18 +147,16 @@ public class IndexMerger final IncrementalIndex index, final Interval dataInterval, File outDir, - Map segmentMetadata, IndexSpec indexSpec ) throws IOException { - return persist(index, dataInterval, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator()); + return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator()); } public File persist( final IncrementalIndex index, final Interval dataInterval, File outDir, - Map segmentMetadata, IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -197,14 +194,16 @@ public class IndexMerger ), index.getMetricAggs(), outDir, - segmentMetadata, indexSpec, progress ); } public File mergeQueryableIndex( - List indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec + List indexes, + final AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec ) throws IOException { return mergeQueryableIndex(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); @@ -238,7 +237,6 @@ public class IndexMerger indexAdapteres, metricAggs, outDir, - null, indexSpec, progress ); @@ -248,11 +246,10 @@ public class IndexMerger List indexes, final AggregatorFactory[] metricAggs, File outDir, - Map segmentMetadata, IndexSpec indexSpec ) throws IOException { - return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator()); + return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); } private static List getLexicographicMergedDimensions(List indexes) @@ -291,7 +288,6 @@ public class IndexMerger List indexes, final AggregatorFactory[] metricAggs, File outDir, - Map segmentMetadata, IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -371,11 +367,11 @@ public class IndexMerger return makeIndexFiles( indexes, + sortedMetricAggs, outDir, progress, mergedDimensions, mergedMetrics, - segmentMetadata, rowMergerFn, indexSpec ); @@ -395,11 +391,11 @@ public class IndexMerger final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); return makeIndexFiles( ImmutableList.of(adapter), + null, outDir, progress, Lists.newArrayList(adapter.getDimensionNames()), Lists.newArrayList(adapter.getMetricNames()), - null, new Function>, Iterable>() { @Nullable @@ -414,15 +410,20 @@ public class IndexMerger } } + public File append( - List indexes, File outDir, IndexSpec indexSpec + List indexes, AggregatorFactory[] aggregators, File outDir, IndexSpec indexSpec ) throws IOException { - return append(indexes, outDir, indexSpec, new BaseProgressIndicator()); + return append(indexes, aggregators, outDir, indexSpec, new BaseProgressIndicator()); } public File append( - List indexes, File outDir, IndexSpec indexSpec, ProgressIndicator progress + List indexes, + AggregatorFactory[] aggregators, + File outDir, + IndexSpec indexSpec, + ProgressIndicator progress ) throws IOException { FileUtils.deleteDirectory(outDir); @@ -470,20 +471,59 @@ public class IndexMerger } }; - return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec); + return makeIndexFiles( + indexes, + aggregators, + outDir, + progress, + mergedDimensions, + mergedMetrics, + rowMergerFn, + indexSpec + ); } protected File makeIndexFiles( final List indexes, + final AggregatorFactory[] metricAggs, final File outDir, final ProgressIndicator progress, final List mergedDimensions, final List mergedMetrics, - final Map segmentMetadata, final Function>, Iterable> rowMergerFn, final IndexSpec indexSpec ) throws IOException { + List metadataList = Lists.transform( + indexes, + new Function() + { + @Nullable + @Override + public Metadata apply(IndexableAdapter input) + { + return input.getMetadata(); + } + } + ); + + Metadata segmentMetadata = null; + if (metricAggs != null) { + AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[metricAggs.length]; + for (int i = 0; i < metricAggs.length; i++) { + combiningMetricAggs[i] = metricAggs[i].getCombiningFactory(); + } + segmentMetadata = Metadata.merge( + metadataList, + combiningMetricAggs + ); + } else { + segmentMetadata = Metadata.merge( + metadataList, + null + ); + } + final Map valueTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); final Map metricTypeNames = Maps.newTreeMap(Ordering.natural().nullsFirst()); final Map columnCapabilities = Maps.newHashMap(); @@ -921,7 +961,7 @@ public class IndexMerger ) ); - if (segmentMetadata != null && !segmentMetadata.isEmpty()) { + if (segmentMetadata != null) { writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata); log.info("wrote metadata.drd in outDir[%s].", v8OutDir); @@ -1297,7 +1337,7 @@ public class IndexMerger return true; } - private void writeMetadataToFile(File metadataFile, Map metadata) throws IOException + private void writeMetadataToFile(File metadataFile, Metadata metadata) throws IOException { try (FileOutputStream metadataFileOutputStream = new FileOutputStream(metadataFile); FileChannel metadataFilechannel = metadataFileOutputStream.getChannel() diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 8cc5e8e3045..a5ab3ccc249 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -21,9 +21,6 @@ package io.druid.segment; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -39,16 +36,12 @@ import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; -import com.metamx.common.IAE; import com.metamx.common.ISE; -import com.metamx.common.guava.FunctionalIterable; -import com.metamx.common.guava.MergeIterable; import com.metamx.common.io.smoosh.FileSmoosher; import com.metamx.common.io.smoosh.SmooshedWriter; import com.metamx.common.logger.Logger; import io.druid.collections.CombiningIterable; import io.druid.common.utils.JodaUtils; -import io.druid.common.utils.SerializerUtils; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.Column; @@ -72,8 +65,6 @@ import io.druid.segment.data.IndexedRTree; import io.druid.segment.data.TmpFileIOPeon; import io.druid.segment.data.VSizeIndexedIntsWriter; import io.druid.segment.data.VSizeIndexedWriter; -import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.serde.ComplexColumnPartSerde; import io.druid.segment.serde.ComplexColumnSerializer; import io.druid.segment.serde.ComplexMetricSerde; @@ -116,11 +107,11 @@ public class IndexMergerV9 extends IndexMerger @Override protected File makeIndexFiles( final List adapters, + final AggregatorFactory[] metricAggs, final File outDir, final ProgressIndicator progress, final List mergedDimensions, final List mergedMetrics, - final Map segmentMetadata, final Function>, Iterable> rowMergerFn, final IndexSpec indexSpec ) throws IOException @@ -128,6 +119,36 @@ public class IndexMergerV9 extends IndexMerger progress.start(); progress.progress(); + List metadataList = Lists.transform( + adapters, + new Function() + { + @Nullable + @Override + public Metadata apply(IndexableAdapter input) + { + return input.getMetadata(); + } + } + ); + + Metadata segmentMetadata = null; + if (metricAggs != null) { + AggregatorFactory[] combiningMetricAggs = new AggregatorFactory[metricAggs.length]; + for (int i = 0; i < metricAggs.length; i++) { + combiningMetricAggs[i] = metricAggs[i].getCombiningFactory(); + } + segmentMetadata = Metadata.merge( + metadataList, + combiningMetricAggs + ); + } else { + segmentMetadata = Metadata.merge( + metadataList, + null + ); + } + final IOPeon ioPeon = new TmpFileIOPeon(false); final FileSmoosher v9Smoosher = new FileSmoosher(outDir); final File v9TmpDir = new File(outDir, "v9-tmp"); @@ -221,10 +242,10 @@ public class IndexMergerV9 extends IndexMerger private void makeMetadataBinary( final FileSmoosher v9Smoosher, final ProgressIndicator progress, - final Map segmentMetadata + final Metadata segmentMetadata ) throws IOException { - if (segmentMetadata != null && !segmentMetadata.isEmpty()) { + if (segmentMetadata != null) { progress.startSection("make metadata.drd"); v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(segmentMetadata))); progress.stopSection("make metadata.drd"); diff --git a/processing/src/main/java/io/druid/segment/IndexableAdapter.java b/processing/src/main/java/io/druid/segment/IndexableAdapter.java index 4e723627f6e..8ca187d6194 100644 --- a/processing/src/main/java/io/druid/segment/IndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/IndexableAdapter.java @@ -49,4 +49,6 @@ public interface IndexableAdapter String getMetricType(String metric); ColumnCapabilities getCapabilities(String column); + + Metadata getMetadata(); } diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java new file mode 100644 index 00000000000..3568cb59817 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -0,0 +1,164 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.query.aggregation.AggregatorFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + */ +public class Metadata +{ + // container is used for arbitrary key-value pairs in segment metadata e.g. + // kafka firehose uses it to store commit offset + @JsonProperty + private final Map container; + + @JsonProperty + private AggregatorFactory[] aggregators; + + public Metadata() + { + container = new ConcurrentHashMap<>(); + } + + public AggregatorFactory[] getAggregators() + { + return aggregators; + } + + public Metadata setAggregators(AggregatorFactory[] aggregators) + { + this.aggregators = aggregators; + return this; + } + + public Metadata putAll(Map other) + { + if (other != null) { + container.putAll(other); + } + return this; + } + + public Object get(String key) + { + return container.get(key); + } + + public Metadata put(String key, Object value) + { + if (value != null) { + container.put(key, value); + } + return this; + } + + // arbitrary key-value pairs from the metadata just follow the semantics of last one wins if same + // key exists in multiple input Metadata containers + // for others e.g. Aggregators, appropriate merging is done + public static Metadata merge( + List toBeMerged, + AggregatorFactory[] overrideMergedAggregators + ) + { + if (toBeMerged == null || toBeMerged.size() == 0) { + return null; + } + + boolean foundSomeMetadata = false; + Map mergedContainer = new HashMap<>(); + List aggregatorsToMerge = overrideMergedAggregators == null + ? new ArrayList() + : null; + + for (Metadata metadata : toBeMerged) { + if (metadata != null) { + foundSomeMetadata = true; + if (aggregatorsToMerge != null) { + aggregatorsToMerge.add(metadata.getAggregators()); + } + mergedContainer.putAll(metadata.container); + } else { + //if metadata and hence aggregators for some segment being merged are unknown then + //final merged segment should not have aggregators in the metadata + aggregatorsToMerge = null; + } + } + + if(!foundSomeMetadata) { + return null; + } + + Metadata result = new Metadata(); + if (aggregatorsToMerge != null) { + result.setAggregators(AggregatorFactory.mergeAggregators(aggregatorsToMerge)); + } else { + result.setAggregators(overrideMergedAggregators); + } + result.container.putAll(mergedContainer); + return result; + + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Metadata metadata = (Metadata) o; + + if (!container.equals(metadata.container)) { + return false; + } + // Probably incorrect - comparing Object[] arrays with Arrays.equals + return Arrays.equals(aggregators, metadata.aggregators); + + } + + @Override + public int hashCode() + { + int result = container.hashCode(); + result = 31 * result + (aggregators != null ? Arrays.hashCode(aggregators) : 0); + return result; + } + + @Override + public String toString() + { + return "Metadata{" + + "container=" + container + + ", aggregators=" + Arrays.toString(aggregators) + + '}'; + } +} diff --git a/processing/src/main/java/io/druid/segment/QueryableIndex.java b/processing/src/main/java/io/druid/segment/QueryableIndex.java index c94ce33eaaa..3daa70896d8 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndex.java @@ -25,7 +25,6 @@ import org.joda.time.Interval; import java.io.Closeable; import java.io.IOException; -import java.util.Map; /** */ @@ -36,7 +35,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable public Indexed getColumnNames(); public Indexed getAvailableDimensions(); public BitmapFactory getBitmapFactoryForDimensions(); - public Map getMetaData(); + public Metadata getMetadata(); /** * The close method shouldn't actually be here as this is nasty. We will adjust it in the future. diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java index 31c555bb61d..d05c0e95c6d 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java @@ -62,6 +62,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter private final int numRows; private final QueryableIndex input; private final List availableDimensions; + private final Metadata metadata; public QueryableIndexIndexableAdapter(QueryableIndex input) { @@ -83,6 +84,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter log.info("No dictionary on dimension[%s]", dim); } } + + this.metadata = input.getMetadata(); } @Override @@ -379,8 +382,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter } if (lastVal != null) { if (GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) { - throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.", - value, lastVal); + throw new ISE( + "Value[%s] is less than the last value[%s] I have, cannot be.", + value, lastVal + ); } return new EmptyIndexedInts(); } @@ -398,12 +403,20 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter } return ret; } else if (compareResult < 0) { - throw new ISE("Skipped currValue[%s], currIndex[%,d]; incoming value[%s]", - currVal, currIndex, value); + throw new ISE( + "Skipped currValue[%s], currIndex[%,d]; incoming value[%s]", + currVal, currIndex, value + ); } else { return new EmptyIndexedInts(); } } }; } + + @Override + public Metadata getMetadata() + { + return metadata; + } } diff --git a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java index fbd055a4fc6..892ca62fb4f 100644 --- a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java @@ -99,4 +99,10 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter { return baseAdapter.getBitmapIndexSeeker(dimension); } + + @Override + public Metadata getMetadata() + { + return baseAdapter.getMetadata(); + } } diff --git a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java index 78a9c4a3c66..5d6aa0452c3 100644 --- a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java @@ -39,7 +39,7 @@ public class SimpleQueryableIndex implements QueryableIndex private final BitmapFactory bitmapFactory; private final Map columns; private final SmooshedFileMapper fileMapper; - private final Map metadata; + private final Metadata metadata; public SimpleQueryableIndex( Interval dataInterval, @@ -48,7 +48,7 @@ public class SimpleQueryableIndex implements QueryableIndex BitmapFactory bitmapFactory, Map columns, SmooshedFileMapper fileMapper, - Map metadata + Metadata metadata ) { Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME)); @@ -104,7 +104,7 @@ public class SimpleQueryableIndex implements QueryableIndex } @Override - public Map getMetaData() + public Metadata getMetadata() { return metadata; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index ba66aee9157..1c0e1d0c1fe 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -44,6 +44,7 @@ import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; +import io.druid.segment.Metadata; import io.druid.segment.ObjectColumnSelector; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -262,6 +263,7 @@ public abstract class IncrementalIndex implements Iterable, private final AggregatorFactory[] metrics; private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; + private final Metadata metadata; private final Map metricDescs; private final Map dimensionDescs; @@ -299,6 +301,8 @@ public abstract class IncrementalIndex implements Iterable, this.rowTransformers = new CopyOnWriteArrayList<>(); this.deserializeComplexMetrics = deserializeComplexMetrics; + this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics)); + this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); this.columnCapabilities = Maps.newHashMap(); @@ -621,6 +625,20 @@ public abstract class IncrementalIndex implements Iterable, return getFacts().subMap(start, end); } + public Metadata getMetadata() + { + return metadata; + } + + private static AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) + { + AggregatorFactory[] combiningAggregators = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; i++) { + combiningAggregators[i] = aggregators[i].getCombiningFactory(); + } + return combiningAggregators; + } + @Override public Iterator iterator() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 3b124f9f0d7..d9c64f84554 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -29,6 +29,7 @@ import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.segment.IndexableAdapter; +import io.druid.segment.Metadata; import io.druid.segment.Rowboat; import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.column.ColumnCapabilities; @@ -58,6 +59,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter private final IncrementalIndex index; private final Map> invertedIndexes; private final Set hasNullValueDimensions; + private final Metadata metadata; public IncrementalIndexAdapter( Interval dataInterval, IncrementalIndex index, BitmapFactory bitmapFactory @@ -65,6 +67,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter { this.dataInterval = dataInterval; this.index = index; + this.metadata = index.getMetadata(); + this.invertedIndexes = Maps.newHashMap(); /* Sometimes it's hard to tell whether one dimension contains a null value or not. * If one dimension had show a null or empty value explicitly, then yes, it contains @@ -398,4 +402,10 @@ public class IncrementalIndexAdapter implements IndexableAdapter } } + + @Override + public Metadata getMetadata() + { + return metadata; + } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index aaf06be2b73..e3d8f947ee2 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -123,7 +123,7 @@ public class MultiValuedDimensionTest persistedSegmentDir = Files.createTempDir(); TestHelper.getTestIndexMerger() - .persist(incrementalIndex, persistedSegmentDir, ImmutableMap.of(), new IndexSpec()); + .persist(incrementalIndex, persistedSegmentDir, new IndexSpec()); queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir); } diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 1f7534f101f..7c6c6c37a55 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -326,7 +326,7 @@ public class AggregationTestHelper catch (IndexSizeExceededException ex) { File tmp = tempFolder.newFolder(); toMerge.add(tmp); - indexMerger.persist(index, tmp, null, new IndexSpec()); + indexMerger.persist(index, tmp, new IndexSpec()); index.close(); index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); } @@ -335,7 +335,7 @@ public class AggregationTestHelper if (toMerge.size() > 0) { File tmp = tempFolder.newFolder(); toMerge.add(tmp); - indexMerger.persist(index, tmp, null, new IndexSpec()); + indexMerger.persist(index, tmp, new IndexSpec()); List indexes = new ArrayList<>(toMerge.size()); for (File file : toMerge) { @@ -347,7 +347,7 @@ public class AggregationTestHelper qi.close(); } } else { - indexMerger.persist(index, outDir, null, new IndexSpec()); + indexMerger.persist(index, outDir, new IndexSpec()); } } finally { diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregatorFactoryTest.java b/processing/src/test/java/io/druid/query/aggregation/AggregatorFactoryTest.java new file mode 100644 index 00000000000..360caf4b0bb --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/AggregatorFactoryTest.java @@ -0,0 +1,80 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation; + +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + */ +public class AggregatorFactoryTest +{ + + @Test + public void testMergeAggregators() + { + Assert.assertNull(AggregatorFactory.mergeAggregators(null)); + Assert.assertNull(AggregatorFactory.mergeAggregators(ImmutableList.of())); + + List aggregatorsToBeMerged = new ArrayList<>(); + + aggregatorsToBeMerged.add(null); + Assert.assertNull(AggregatorFactory.mergeAggregators(aggregatorsToBeMerged)); + + AggregatorFactory[] emptyAggFactory = new AggregatorFactory[0]; + + aggregatorsToBeMerged.clear(); + aggregatorsToBeMerged.add(emptyAggFactory); + Assert.assertArrayEquals(emptyAggFactory, AggregatorFactory.mergeAggregators(aggregatorsToBeMerged)); + + aggregatorsToBeMerged.clear(); + aggregatorsToBeMerged.add(emptyAggFactory); + aggregatorsToBeMerged.add(null); + Assert.assertNull(AggregatorFactory.mergeAggregators(aggregatorsToBeMerged)); + + aggregatorsToBeMerged.clear(); + AggregatorFactory[] af1 = new AggregatorFactory[]{ + new LongMaxAggregatorFactory("name", "fieldName1") + }; + AggregatorFactory[] af2 = new AggregatorFactory[]{ + new LongMaxAggregatorFactory("name", "fieldName2") + }; + Assert.assertArrayEquals( + new AggregatorFactory[]{ + new LongMaxAggregatorFactory("name", "name") + }, + AggregatorFactory.mergeAggregators(ImmutableList.of(af1, af2)) + ); + + aggregatorsToBeMerged.clear(); + af1 = new AggregatorFactory[]{ + new LongMaxAggregatorFactory("name", "fieldName1") + }; + af2 = new AggregatorFactory[]{ + new DoubleMaxAggregatorFactory("name", "fieldName2") + }; + Assert.assertNull(AggregatorFactory.mergeAggregators(ImmutableList.of(af1, af2)) + ); + } +} diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index 1536e805f50..cf14cec1aac 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -63,7 +63,6 @@ public class EmptyIndexTest Lists.newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir, - null, new IndexSpec() ); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 53ed45453fe..514980cde0b 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -142,7 +142,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist, tempDir, - null, indexSpec ) ) @@ -153,6 +152,11 @@ public class IndexMergerTest Assert.assertEquals(3, index.getColumnNames().size()); assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); + + Assert.assertArrayEquals( + IncrementalIndexTest.getDefaultCombiningAggregatorFactories(), + index.getMetadata().getAggregators() + ); } @Test @@ -180,7 +184,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist, tempDir, - null, indexSpec ) ) @@ -210,6 +213,7 @@ public class IndexMergerTest checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim2", "2")); } + @Test public void testMergeRetainsValues() throws Exception { @@ -231,7 +235,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -246,12 +249,12 @@ public class IndexMergerTest Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); - + AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")}; QueryableIndex merged = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedAggregators, mergedDir, indexSpec ) @@ -276,7 +279,8 @@ public class IndexMergerTest IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist); - Map segmentMetadata = ImmutableMap.of("key", "value"); + Map metadataElems = ImmutableMap.of("key", "value"); + toPersist.getMetadata().putAll(metadataElems); final File tempDir = temporaryFolder.newFolder(); QueryableIndex index = closer.closeLater( @@ -284,7 +288,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist, tempDir, - segmentMetadata, indexSpec ) ) @@ -296,7 +299,14 @@ public class IndexMergerTest assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); - Assert.assertEquals(segmentMetadata, index.getMetaData()); + Assert.assertEquals( + new Metadata() + .setAggregators( + IncrementalIndexTest.getDefaultCombiningAggregatorFactories() + ) + .putAll(metadataElems), + index.getMetadata() + ); } @Test @@ -338,7 +348,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -353,7 +362,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist2, tempDir2, - null, indexSpec ) ) @@ -363,11 +371,14 @@ public class IndexMergerTest Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); Assert.assertEquals(3, index2.getColumnNames().size()); + AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{ + new CountAggregatorFactory("count") + }; QueryableIndex merged = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( Arrays.asList(index1, index2), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedAggregators, mergedDir, indexSpec ) @@ -380,6 +391,11 @@ public class IndexMergerTest assertDimCompression(index2, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + + Assert.assertArrayEquals( + getCombiningAggregators(mergedAggregators), + merged.getMetadata().getAggregators() + ); } @Test @@ -422,7 +438,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tmpDir1, - null, indexSpec ) ) @@ -432,7 +447,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tmpDir2, - null, indexSpec ) ) @@ -479,9 +493,9 @@ public class IndexMergerTest ); QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex( - INDEX_MERGER.append( - ImmutableList.of(incrementalAdapter), tempDir1, indexSpec + INDEX_IO.loadIndex( + INDEX_MERGER.append( + ImmutableList.of(incrementalAdapter), null, tempDir1, indexSpec ) ) ); @@ -493,12 +507,17 @@ public class IndexMergerTest Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); + Assert.assertArrayEquals( + IncrementalIndexTest.getDefaultCombiningAggregatorFactories(), + index1.getMetadata().getAggregators() + ); + AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")}; QueryableIndex merged = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedAggregators, mergedDir, indexSpec ) @@ -513,6 +532,11 @@ public class IndexMergerTest assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(merged, indexSpec.getDimensionCompressionStrategy()); + + Assert.assertArrayEquals( + getCombiningAggregators(mergedAggregators), + merged.getMetadata().getAggregators() + ); } @Test @@ -536,7 +560,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -551,19 +574,18 @@ public class IndexMergerTest Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); - IndexSpec newSpec = new IndexSpec( indexSpec.getBitmapSerdeFactory(), "lz4".equals(indexSpec.getDimensionCompression()) ? "lzf" : "lz4", "lz4".equals(indexSpec.getMetricCompression()) ? "lzf" : "lz4" ); - + AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")}; QueryableIndex merged = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( ImmutableList.of(index1), - new AggregatorFactory[]{new CountAggregatorFactory("count")}, + mergedAggregators, mergedDir, newSpec ) @@ -585,15 +607,15 @@ public class IndexMergerTest public void testConvertSame() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( - new AggregatorFactory[]{ - new LongSumAggregatorFactory( - "longSum1", - "dim1" - ), - new LongSumAggregatorFactory("longSum2", "dim2") - } - ); + final AggregatorFactory[] aggregators = new AggregatorFactory[]{ + new LongSumAggregatorFactory( + "longSum1", + "dim1" + ), + new LongSumAggregatorFactory("longSum2", "dim2") + }; + + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(aggregators); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -606,7 +628,7 @@ public class IndexMergerTest ); QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, null, indexSpec)) + INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, indexSpec)) ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -636,6 +658,11 @@ public class IndexMergerTest assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(converted, indexSpec.getDimensionCompressionStrategy()); + + Assert.assertArrayEquals( + getCombiningAggregators(aggregators), + converted.getMetadata().getAggregators() + ); } @@ -643,15 +670,15 @@ public class IndexMergerTest public void testConvertDifferent() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( - new AggregatorFactory[]{ - new LongSumAggregatorFactory( - "longSum1", - "dim1" - ), - new LongSumAggregatorFactory("longSum2", "dim2") - } - ); + final AggregatorFactory[] aggregators = new AggregatorFactory[]{ + new LongSumAggregatorFactory( + "longSum1", + "dim1" + ), + new LongSumAggregatorFactory("longSum2", "dim2") + }; + + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(aggregators); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -668,7 +695,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -708,6 +734,11 @@ public class IndexMergerTest assertDimCompression(index1, indexSpec.getDimensionCompressionStrategy()); assertDimCompression(converted, newSpec.getDimensionCompressionStrategy()); + + Assert.assertArrayEquals( + getCombiningAggregators(aggregators), + converted.getMetadata().getAggregators() + ); } private void assertDimCompression(QueryableIndex index, CompressedObjectStrategy.CompressionStrategy expectedStrategy) @@ -736,7 +767,6 @@ public class IndexMergerTest Assert.assertEquals(expectedStrategy, strategy); } - @Test public void testNonLexicographicDimOrderMerge() throws Exception { @@ -754,7 +784,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist1, tmpDir, - null, indexSpec ) ) @@ -765,7 +794,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist2, tmpDir2, - null, indexSpec ) ) @@ -776,7 +804,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersist3, tmpDir3, - null, indexSpec ) ) @@ -826,7 +853,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersistA, tmpDirA, - null, indexSpec ) ) @@ -837,7 +863,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersistB, tmpDirB, - null, indexSpec ) ) @@ -954,7 +979,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersistA, tmpDirA, - null, indexSpec ) ) @@ -965,7 +989,6 @@ public class IndexMergerTest INDEX_MERGER.persist( toPersistB, tmpDirB, - null, indexSpec ) ) @@ -1097,4 +1120,13 @@ public class IndexMergerTest return toPersist1; } + + private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) + { + AggregatorFactory[] combiningAggregators = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; i++) { + combiningAggregators[i] = aggregators[i].getCombiningFactory(); + } + return combiningAggregators; + } } diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java b/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java index db8cf9f8e17..2f2a249dffd 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java @@ -170,12 +170,13 @@ public class IndexMergerV9CompatibilityTest DEFAULT_AGG_FACTORIES, 1000000 ); + toPersist.getMetadata().put("key", "value"); for (InputRow event : events) { toPersist.add(event); } tmpDir = Files.createTempDir(); persistTmpDir = new File(tmpDir, "persistDir"); - INDEX_MERGER.persist(toPersist, persistTmpDir, null, INDEX_SPEC); + INDEX_MERGER.persist(toPersist, persistTmpDir, INDEX_SPEC); } @After @@ -191,10 +192,9 @@ public class IndexMergerV9CompatibilityTest QueryableIndex index = null; try { outDir = Files.createTempDir(); - Map segmentMetadata = ImmutableMap.of("key", "value"); - index = INDEX_IO.loadIndex(INDEX_MERGER_V9.persist(toPersist, outDir, segmentMetadata, INDEX_SPEC)); + index = INDEX_IO.loadIndex(INDEX_MERGER_V9.persist(toPersist, outDir, INDEX_SPEC)); - Assert.assertEquals(segmentMetadata, index.getMetaData()); + Assert.assertEquals("value", index.getMetadata().get("key")); } finally { if (index != null) { @@ -237,6 +237,7 @@ public class IndexMergerV9CompatibilityTest { final File outDir = INDEX_MERGER.append( ImmutableList.of(new QueryableIndexIndexableAdapter(closer.closeLater(INDEX_IO.loadIndex(inDir)))), + null, tmpDir, INDEX_SPEC ); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java b/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java index e75854e0ec8..8930c28e242 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9Test.java @@ -57,7 +57,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; @RunWith(Parameterized.class) public class IndexMergerV9Test @@ -142,7 +141,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist, tempDir, - null, indexSpec ) ) @@ -180,7 +178,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist, tempDir, - null, indexSpec ) ) @@ -217,8 +214,7 @@ public class IndexMergerV9Test IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist); - - Map segmentMetadata = ImmutableMap.of("key", "value"); + toPersist.getMetadata().put("key", "value"); final File tempDir = temporaryFolder.newFolder(); QueryableIndex index = closer.closeLater( @@ -226,7 +222,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist, tempDir, - segmentMetadata, indexSpec ) ) @@ -238,7 +233,7 @@ public class IndexMergerV9Test assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); - Assert.assertEquals(segmentMetadata, index.getMetaData()); + Assert.assertEquals("value", index.getMetadata().get("key")); } @Test @@ -280,7 +275,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -295,7 +289,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist2, tempDir2, - null, indexSpec ) ) @@ -364,7 +357,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist1, tmpDir1, - null, indexSpec ) ) @@ -374,7 +366,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist1, tmpDir2, - null, indexSpec ) ) @@ -425,7 +416,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -482,7 +472,7 @@ public class IndexMergerV9Test QueryableIndex index1 = closer.closeLater( INDEX_IO.loadIndex( INDEX_MERGER.append( - ImmutableList.of(incrementalAdapter), tempDir1, indexSpec + ImmutableList.of(incrementalAdapter), null, tempDir1, indexSpec ) ) ); @@ -537,7 +527,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -607,7 +596,7 @@ public class IndexMergerV9Test ); QueryableIndex index1 = closer.closeLater( - INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, null, indexSpec)) + INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist1, tempDir1, indexSpec)) ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -669,7 +658,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersist1, tempDir1, - null, indexSpec ) ) @@ -751,7 +739,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersistA, tmpDirA, - null, indexSpec ) ) @@ -762,7 +749,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersistB, tmpDirB, - null, indexSpec ) ) @@ -879,7 +865,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersistA, tmpDirA, - null, indexSpec ) ) @@ -890,7 +875,6 @@ public class IndexMergerV9Test INDEX_MERGER.persist( toPersistB, tmpDirB, - null, indexSpec ) ) diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java index 568f7d48023..7070fbc5628 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java @@ -258,7 +258,7 @@ public class IndexMergerV9WithSpatialIndexTest tmpFile.mkdirs(); tmpFile.deleteOnExit(); - INDEX_MERGER_V9.persist(theIndex, tmpFile, null, indexSpec); + INDEX_MERGER_V9.persist(theIndex, tmpFile, indexSpec); return INDEX_IO.loadIndex(tmpFile); } @@ -478,9 +478,9 @@ public class IndexMergerV9WithSpatialIndexTest mergedFile.mkdirs(); mergedFile.deleteOnExit(); - INDEX_MERGER_V9.persist(first, DATA_INTERVAL, firstFile, null, indexSpec); - INDEX_MERGER_V9.persist(second, DATA_INTERVAL, secondFile, null, indexSpec); - INDEX_MERGER_V9.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec); + INDEX_MERGER_V9.persist(first, DATA_INTERVAL, firstFile, indexSpec); + INDEX_MERGER_V9.persist(second, DATA_INTERVAL, secondFile, indexSpec); + INDEX_MERGER_V9.persist(third, DATA_INTERVAL, thirdFile, indexSpec); QueryableIndex mergedRealtime = INDEX_IO.loadIndex( INDEX_MERGER_V9.mergeQueryableIndex( diff --git a/processing/src/test/java/io/druid/segment/MetadataTest.java b/processing/src/test/java/io/druid/segment/MetadataTest.java new file mode 100644 index 00000000000..c2a4d52877b --- /dev/null +++ b/processing/src/test/java/io/druid/segment/MetadataTest.java @@ -0,0 +1,118 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.LongMaxAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + */ +public class MetadataTest +{ + @Test + public void testSerde() throws Exception + { + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + Metadata metadata = new Metadata(); + metadata.put("k", "v"); + + AggregatorFactory[] aggregators = new AggregatorFactory[] + { + new LongSumAggregatorFactory("out", "in") + }; + metadata.setAggregators(aggregators); + + Metadata other = jsonMapper.readValue( + jsonMapper.writeValueAsString(metadata), + Metadata.class + ); + + Assert.assertEquals(metadata, other); + } + + @Test + public void testMerge() + { + Assert.assertNull(Metadata.merge(null, null)); + Assert.assertNull(Metadata.merge(ImmutableList.of(), null)); + + List metadataToBeMerged = new ArrayList<>(); + + metadataToBeMerged.add(null); + Assert.assertNull(Metadata.merge(metadataToBeMerged, null)); + + //sanity merge check + AggregatorFactory[] aggs = new AggregatorFactory[] { + new LongMaxAggregatorFactory("n", "f") + }; + Metadata m1 = new Metadata(); + m1.put("k", "v"); + m1.setAggregators(aggs); + + Metadata m2 = new Metadata(); + m2.put("k", "v"); + m2.setAggregators(aggs); + + Metadata merged = new Metadata(); + merged.put("k", "v"); + merged.setAggregators( + new AggregatorFactory[]{ + new LongMaxAggregatorFactory("n", "n") + } + ); + Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null)); + + //merge check with one metadata being null + metadataToBeMerged.clear(); + metadataToBeMerged.add(m1); + metadataToBeMerged.add(m2); + metadataToBeMerged.add(null); + + merged.setAggregators(null); + Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null)); + + //merge check with client explicitly providing merged aggregators + AggregatorFactory[] explicitAggs = new AggregatorFactory[] { + new DoubleMaxAggregatorFactory("x", "y") + }; + merged.setAggregators(explicitAggs); + + Assert.assertEquals( + merged, + Metadata.merge(metadataToBeMerged, explicitAggs) + ); + + Assert.assertEquals( + merged, + Metadata.merge(ImmutableList.of(m1, m2), explicitAggs) + ); + } +} diff --git a/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java b/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java index 4f353a13ed0..dbaa6f5eb31 100644 --- a/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/QueryableIndexIndexableAdapterTest.java @@ -19,8 +19,7 @@ package io.druid.segment; -import java.io.File; - +import com.metamx.common.ISE; import io.druid.segment.column.BitmapIndexSeeker; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.ConciseBitmapSerdeFactory; @@ -28,13 +27,12 @@ import io.druid.segment.data.IncrementalIndexTest; import io.druid.segment.data.IndexedInts; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; - import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import com.metamx.common.ISE; +import java.io.File; public class QueryableIndexIndexableAdapterTest { private final static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger(); @@ -69,7 +67,6 @@ public class QueryableIndexIndexableAdapterTest { INDEX_MERGER.persist( toPersist, tempDir, - null, INDEX_SPEC ) ) diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index 09a943951d5..ab8e2019ab3 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -191,8 +191,8 @@ public class SchemalessIndex mergedFile.mkdirs(); mergedFile.deleteOnExit(); - INDEX_MERGER.persist(top, topFile, null, indexSpec); - INDEX_MERGER.persist(bottom, bottomFile, null, indexSpec); + INDEX_MERGER.persist(top, topFile, indexSpec); + INDEX_MERGER.persist(bottom, bottomFile, indexSpec); mergedIndex = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( @@ -361,7 +361,7 @@ public class SchemalessIndex tmpFile.mkdirs(); tmpFile.deleteOnExit(); - INDEX_MERGER.persist(rowIndex, tmpFile, null, indexSpec); + INDEX_MERGER.persist(rowIndex, tmpFile, indexSpec); rowPersistedIndexes.add(INDEX_IO.loadIndex(tmpFile)); } } @@ -421,7 +421,7 @@ public class SchemalessIndex theFile.mkdirs(); theFile.deleteOnExit(); filesToMap.add(theFile); - INDEX_MERGER.persist(index, theFile, null, indexSpec); + INDEX_MERGER.persist(index, theFile, indexSpec); } return filesToMap; @@ -495,7 +495,7 @@ public class SchemalessIndex ) ); - return INDEX_IO.loadIndex(INDEX_MERGER.append(adapters, mergedFile, indexSpec)); + return INDEX_IO.loadIndex(INDEX_MERGER.append(adapters, null, mergedFile, indexSpec)); } catch (IOException e) { throw Throwables.propagate(e); diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index a4ff3b60233..d86ff55ad09 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -38,9 +38,10 @@ public class TestHelper private static final IndexMerger INDEX_MERGER; private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexIO INDEX_IO; - public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + public static final ObjectMapper JSON_MAPPER; static { + JSON_MAPPER = new DefaultObjectMapper(); INDEX_IO = new IndexIO( JSON_MAPPER, new ColumnConfig() @@ -56,6 +57,11 @@ public class TestHelper INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO); } + public static ObjectMapper getTestObjectMapper() + { + return JSON_MAPPER; + } + public static IndexMerger getTestIndexMerger() { diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index bffe3a755f2..737f0acb110 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -143,8 +143,8 @@ public class TestIndex mergedFile.mkdirs(); mergedFile.deleteOnExit(); - INDEX_MERGER.persist(top, DATA_INTERVAL, topFile, null, indexSpec); - INDEX_MERGER.persist(bottom, DATA_INTERVAL, bottomFile, null, indexSpec); + INDEX_MERGER.persist(top, DATA_INTERVAL, topFile, indexSpec); + INDEX_MERGER.persist(bottom, DATA_INTERVAL, bottomFile, indexSpec); mergedRealtime = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( @@ -243,7 +243,7 @@ public class TestIndex someTmpFile.mkdirs(); someTmpFile.deleteOnExit(); - INDEX_MERGER.persist(index, someTmpFile, null, indexSpec); + INDEX_MERGER.persist(index, someTmpFile, indexSpec); return INDEX_IO.loadIndex(someTmpFile); } catch (IOException e) { diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 25287651564..3a174876331 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -128,6 +128,16 @@ public class IncrementalIndexTest ); } + public static AggregatorFactory[] getDefaultAggregatorFactories() + { + return defaultAggregatorFactories; + } + + public static AggregatorFactory[] getDefaultCombiningAggregatorFactories() + { + return defaultCombiningAggregatorFactories; + } + public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories) { if (null == aggregatorFactories) { @@ -188,6 +198,10 @@ public class IncrementalIndexTest ) }; + private static final AggregatorFactory[] defaultCombiningAggregatorFactories = new AggregatorFactory[]{ + defaultAggregatorFactories[0].getCombiningFactory() + }; + @Test public void testCaseSensitivity() throws Exception { diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 1dc12ad6619..4fe65daedca 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -237,7 +237,7 @@ public class SpatialFilterBonusTest tmpFile.mkdirs(); tmpFile.deleteOnExit(); - INDEX_MERGER.persist(theIndex, tmpFile, null, indexSpec); + INDEX_MERGER.persist(theIndex, tmpFile, indexSpec); return INDEX_IO.loadIndex(tmpFile); } @@ -417,9 +417,9 @@ public class SpatialFilterBonusTest mergedFile.mkdirs(); mergedFile.deleteOnExit(); - INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, null, indexSpec); - INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, null, indexSpec); - INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec); + INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, indexSpec); + INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, indexSpec); + INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, indexSpec); QueryableIndex mergedRealtime = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index 46cb42cb531..2f20f61caa7 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -266,7 +266,7 @@ public class SpatialFilterTest tmpFile.mkdirs(); tmpFile.deleteOnExit(); - INDEX_MERGER.persist(theIndex, tmpFile, null, indexSpec); + INDEX_MERGER.persist(theIndex, tmpFile, indexSpec); return INDEX_IO.loadIndex(tmpFile); } @@ -486,9 +486,9 @@ public class SpatialFilterTest mergedFile.mkdirs(); mergedFile.deleteOnExit(); - INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, null, indexSpec); - INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, null, indexSpec); - INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec); + INDEX_MERGER.persist(first, DATA_INTERVAL, firstFile, indexSpec); + INDEX_MERGER.persist(second, DATA_INTERVAL, secondFile, indexSpec); + INDEX_MERGER.persist(third, DATA_INTERVAL, thirdFile, indexSpec); QueryableIndex mergedRealtime = INDEX_IO.loadIndex( INDEX_MERGER.mergeQueryableIndex( diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 421f25c5ff5..59cebb931de 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -63,6 +63,7 @@ import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; +import io.druid.segment.Metadata; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.ReferenceCountingSegment; @@ -86,6 +87,7 @@ import org.joda.time.Interval; import org.joda.time.Period; import javax.annotation.Nullable; +import javax.ws.rs.HEAD; import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; @@ -403,7 +405,7 @@ public class RealtimePlumber implements Plumber final Stopwatch runExecStopwatch = Stopwatch.createStarted(); final Stopwatch persistStopwatch = Stopwatch.createStarted(); - final Map metadata = committer.getMetadata() == null ? null : + final Map metadataElems = committer.getMetadata() == null ? null : ImmutableMap.of( COMMIT_METADATA_KEY, committer.getMetadata(), @@ -447,7 +449,7 @@ public class RealtimePlumber implements Plumber for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount( persistHydrant( - pair.lhs, schema, pair.rhs, metadata + pair.lhs, schema, pair.rhs, metadataElems ) ); } @@ -767,7 +769,7 @@ public class RealtimePlumber implements Plumber //at some point. continue; } - Map segmentMetadata = queryableIndex.getMetaData(); + Metadata segmentMetadata = queryableIndex.getMetadata(); if (segmentMetadata != null) { Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); if (timestampObj != null) { @@ -775,10 +777,10 @@ public class RealtimePlumber implements Plumber if (timestamp > latestCommitTime) { log.info( "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", - queryableIndex.getMetaData(), timestamp, latestCommitTime + queryableIndex.getMetadata(), timestamp, latestCommitTime ); latestCommitTime = timestamp; - metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); + metadata = queryableIndex.getMetadata().get(COMMIT_METADATA_KEY); } } } @@ -1000,7 +1002,7 @@ public class RealtimePlumber implements Plumber FireHydrant indexToPersist, DataSchema schema, Interval interval, - Map metaData + Map metadataElems ) { synchronized (indexToPersist) { @@ -1016,7 +1018,7 @@ public class RealtimePlumber implements Plumber "DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", schema.getDataSource(), interval, - metaData, + metadataElems, indexToPersist ); try { @@ -1024,11 +1026,11 @@ public class RealtimePlumber implements Plumber final IndexSpec indexSpec = config.getIndexSpec(); + indexToPersist.getIndex().getMetadata().putAll(metadataElems); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), interval, new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - metaData, indexSpec ); diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index e320d5934d0..beeada2b51b 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -124,7 +124,7 @@ public class IngestSegmentFirehoseTest for (String line : rows) { index.add(parser.parse(line)); } - indexMerger.persist(index, segmentDir, null, new IndexSpec()); + indexMerger.persist(index, segmentDir, new IndexSpec()); } finally { if (index != null) {