diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 9124b9b4c1d..4539232b0f1 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -117,9 +117,13 @@ public class IndexGeneratorJob implements Jobby FileSystem fs = descriptorInfoDir.getFileSystem(conf); for (FileStatus status : fs.listStatus(descriptorInfoDir)) { - final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class); + final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open( + status.getPath()), DataSegmentAndIndexZipFilePath.class); publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath); - log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId()); + log.info( + "Adding segment %s to the list of published segments", + segmentAndIndexZipFilePath.getSegment().getId() + ); } } catch (FileNotFoundException e) { @@ -303,10 +307,10 @@ public class IndexGeneratorJob implements Jobby // Build the incremental-index according to the spec that was chosen by the user IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder() - .setIndexSchema(indexSchema) - .setMaxRowCount(tuningConfig.getMaxRowsInMemory()) - .setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault()) - .build(); + .setIndexSchema(indexSchema) + .setMaxRowCount(tuningConfig.getMaxRowsInMemory()) + .setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault()) + .build(); if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) { newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities); @@ -609,7 +613,18 @@ public class IndexGeneratorJob implements Jobby { boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup(); return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .mergeQueryableIndex(indexes, rollup, aggs, null, file, config.getIndexSpec(), progressIndicator, null, -1); + .mergeQueryableIndex( + indexes, + rollup, + aggs, + null, + file, + config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), + progressIndicator, + null, + -1 + ); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index 76928c29a25..39517be6b79 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -42,6 +42,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexMergerV9; @@ -339,8 +340,11 @@ abstract class PartialSegmentMergeTask extends PerfectRollu indexesToMerge, dataSchema.getGranularitySpec().isRollup(), dataSchema.getAggregators(), + null, outDir, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), + new BaseProgressIndicator(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.getMaxColumnsToMerge() ) diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java index 991cbdd6f50..c0ed2ac01e7 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java @@ -188,33 +188,60 @@ public interface IndexMerger return Lists.newArrayList(retVal); } - File persist( + /** + * Equivalent to {@link #persist(IncrementalIndex, Interval, File, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory)} + * without a progress indicator and with interval set to {@link IncrementalIndex#getInterval()}. + */ + default File persist( IncrementalIndex index, File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException; + ) throws IOException + { + return persist( + index, + index.getInterval(), + outDir, + indexSpec, + new BaseProgressIndicator(), + segmentWriteOutMediumFactory + ); + } /** - * This is *not* thread-safe and havok will ensue if this is called and writes are still occurring - * on the IncrementalIndex object. + * Equivalent to {@link #persist(IncrementalIndex, Interval, File, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory)} + * without a progress indicator. + */ + default File persist( + IncrementalIndex index, + Interval dataInterval, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException + { + return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory); + } + + /** + * Persist an IncrementalIndex to disk in such a way that it can be loaded back up as a {@link QueryableIndex}. * - * @param index the IncrementalIndex to persist - * @param dataInterval the Interval that the data represents - * @param outDir the directory to persist the data to + * This is *not* thread-safe and havoc will ensue if this is called and writes are still occurring on the + * IncrementalIndex object. + * + * @param index the IncrementalIndex to persist + * @param dataInterval the Interval that the data represents. Typically, this is the same as the + * interval from the corresponding {@link org.apache.druid.timeline.SegmentId}. + * @param outDir the directory to persist the data to + * @param indexSpec storage and compression options + * @param progress an object that will receive progress updates + * @param segmentWriteOutMediumFactory controls allocation of temporary data structures * * @return the index output directory * * @throws IOException if an IO error occurs persisting the index */ - File persist( - IncrementalIndex index, - Interval dataInterval, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException; - File persist( IncrementalIndex index, Interval dataInterval, @@ -224,7 +251,14 @@ public interface IndexMerger @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) throws IOException; - File mergeQueryableIndex( + /** + * Merge a collection of {@link QueryableIndex}. + * + * Only used as a convenience method in tests. In production code, use the full version + * {@link #mergeQueryableIndex(List, boolean, AggregatorFactory[], DimensionsSpec, File, IndexSpec, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory, int)}. + */ + @VisibleForTesting + default File mergeQueryableIndex( List indexes, boolean rollup, AggregatorFactory[] metricAggs, @@ -232,19 +266,25 @@ public interface IndexMerger IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge - ) throws IOException; - - File mergeQueryableIndex( - List indexes, - boolean rollup, - AggregatorFactory[] metricAggs, - @Nullable DimensionsSpec dimensionsSpec, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - int maxColumnsToMerge - ) throws IOException; + ) throws IOException + { + return mergeQueryableIndex( + indexes, + rollup, + metricAggs, + null, + outDir, + indexSpec, + indexSpec, + new BaseProgressIndicator(), + segmentWriteOutMediumFactory, + maxColumnsToMerge + ); + } + /** + * Merge a collection of {@link QueryableIndex}. + */ File mergeQueryableIndex( List indexes, boolean rollup, @@ -252,11 +292,20 @@ public interface IndexMerger @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge ) throws IOException; + /** + * Only used as a convenience method in tests. + * + * In production code, to merge multiple {@link QueryableIndex}, use + * {@link #mergeQueryableIndex(List, boolean, AggregatorFactory[], DimensionsSpec, File, IndexSpec, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory, int)}. + * To merge multiple {@link IncrementalIndex}, call one of the {@link #persist} methods and then merge the resulting + * {@link QueryableIndex}. + */ @VisibleForTesting File merge( List indexes, @@ -267,17 +316,6 @@ public interface IndexMerger int maxColumnsToMerge ) throws IOException; - // Faster than IndexMaker - File convert(File inDir, File outDir, IndexSpec indexSpec) throws IOException; - - File append( - List indexes, - AggregatorFactory[] aggregators, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException; - /** * This method applies {@link DimensionMerger#convertSortedSegmentRowValuesToMergedRowValues(int, ColumnValueSelector)} to * all dimension column selectors of the given sourceRowIterator, using the given index number. diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index b433fb9c2b1..92a25f50fb7 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -21,8 +21,6 @@ package org.apache.druid.segment; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.primitives.Ints; @@ -811,29 +809,6 @@ public class IndexMergerV9 implements IndexMerger } } - @Override - public File persist( - final IncrementalIndex index, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException - { - return persist(index, index.getInterval(), outDir, indexSpec, segmentWriteOutMediumFactory); - } - - @Override - public File persist( - final IncrementalIndex index, - final Interval dataInterval, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException - { - return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory); - } - @Override public File persist( final IncrementalIndex index, @@ -879,60 +854,13 @@ public class IndexMergerV9 implements IndexMerger null, outDir, indexSpec, + indexSpec, progress, segmentWriteOutMediumFactory, -1 ); } - @Override - public File mergeQueryableIndex( - List indexes, - boolean rollup, - final AggregatorFactory[] metricAggs, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - int maxColumnsToMerge - ) throws IOException - { - return mergeQueryableIndex( - indexes, - rollup, - metricAggs, - null, - outDir, - indexSpec, - segmentWriteOutMediumFactory, - maxColumnsToMerge - ); - } - - @Override - public File mergeQueryableIndex( - List indexes, - boolean rollup, - final AggregatorFactory[] metricAggs, - @Nullable DimensionsSpec dimensionsSpec, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - int maxColumnsToMerge - ) throws IOException - { - return mergeQueryableIndex( - indexes, - rollup, - metricAggs, - dimensionsSpec, - outDir, - indexSpec, - new BaseProgressIndicator(), - segmentWriteOutMediumFactory, - maxColumnsToMerge - ); - } - @Override public File mergeQueryableIndex( List indexes, @@ -941,6 +869,7 @@ public class IndexMergerV9 implements IndexMerger @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge @@ -953,6 +882,7 @@ public class IndexMergerV9 implements IndexMerger dimensionsSpec, outDir, indexSpec, + indexSpecForIntermediatePersists, progress, segmentWriteOutMediumFactory, maxColumnsToMerge @@ -969,7 +899,18 @@ public class IndexMergerV9 implements IndexMerger int maxColumnsToMerge ) throws IOException { - return multiphaseMerge(indexes, rollup, metricAggs, null, outDir, indexSpec, new BaseProgressIndicator(), null, maxColumnsToMerge); + return multiphaseMerge( + indexes, + rollup, + metricAggs, + null, + outDir, + indexSpec, + indexSpec, + new BaseProgressIndicator(), + null, + maxColumnsToMerge + ); } private File multiphaseMerge( @@ -979,6 +920,7 @@ public class IndexMergerV9 implements IndexMerger @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge @@ -1012,8 +954,9 @@ public class IndexMergerV9 implements IndexMerger while (true) { log.info("Merging %d phases, tiers finished processed so far: %d.", currentPhases.size(), tierCounter); for (List phase : currentPhases) { - File phaseOutDir; - if (currentPhases.size() == 1) { + final File phaseOutDir; + final boolean isFinalPhase = currentPhases.size() == 1; + if (isFinalPhase) { // use the given outDir on the final merge phase phaseOutDir = outDir; log.info("Performing final merge phase."); @@ -1030,7 +973,7 @@ public class IndexMergerV9 implements IndexMerger metricAggs, dimensionsSpec, phaseOutDir, - indexSpec, + isFinalPhase ? indexSpec : indexSpecForIntermediatePersists, progress, segmentWriteOutMediumFactory ); @@ -1186,69 +1129,6 @@ public class IndexMergerV9 implements IndexMerger ); } - @Override - public File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException - { - return convert(inDir, outDir, indexSpec, new BaseProgressIndicator(), defaultSegmentWriteOutMediumFactory); - } - - private File convert( - final File inDir, - final File outDir, - final IndexSpec indexSpec, - final ProgressIndicator progress, - final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException - { - try (QueryableIndex index = indexIO.loadIndex(inDir)) { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); - return makeIndexFiles( - ImmutableList.of(adapter), - null, - outDir, - progress, - Lists.newArrayList(adapter.getDimensionNames()), - Lists.newArrayList(adapter.getMetricNames()), - Iterables::getOnlyElement, - false, - indexSpec, - segmentWriteOutMediumFactory - ); - } - } - - @Override - public File append( - List indexes, - AggregatorFactory[] aggregators, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) throws IOException - { - FileUtils.deleteDirectory(outDir); - FileUtils.mkdirp(outDir); - - final List mergedDimensions = IndexMerger.getMergedDimensions(indexes, null); - - final List mergedMetrics = IndexMerger.mergeIndexed( - indexes.stream().map(IndexableAdapter::getMetricNames).collect(Collectors.toList()) - ); - - return makeIndexFiles( - indexes, - aggregators, - outDir, - new BaseProgressIndicator(), - mergedDimensions, - mergedMetrics, - MergingRowIterator::new, - true, - indexSpec, - segmentWriteOutMediumFactory - ); - } - private Map makeDimensionHandlers( final List mergedDimensions, final List dimCapabilities diff --git a/processing/src/main/java/org/apache/druid/segment/Metadata.java b/processing/src/main/java/org/apache/druid/segment/Metadata.java index 08c29113cba..e8aa5e646de 100644 --- a/processing/src/main/java/org/apache/druid/segment/Metadata.java +++ b/processing/src/main/java/org/apache/druid/segment/Metadata.java @@ -113,14 +113,6 @@ public class Metadata return container.get(key); } - public Metadata put(String key, @Nullable Object value) - { - if (value != null) { - container.put(key, value); - } - return this; - } - // arbitrary key-value pairs from the metadata just follow the semantics of last one wins if same // key exists in multiple input Metadata containers // for others e.g. Aggregators, appropriate merging is done diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java index 6be98b5c3a8..801e19a0d09 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexIndexableAdapter.java @@ -198,7 +198,6 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter private final TimeAndDimsPointer markedRowPointer; boolean first = true; - int memoizedOffset = -1; RowIteratorImpl() { @@ -334,20 +333,6 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter markedMetricSelectors[i].setValueFrom(rowMetricSelectors[i]); } } - - /** - * Used in {@link RowFilteringIndexAdapter} - */ - void memoizeOffset() - { - memoizedOffset = offset.getOffset(); - } - - void resetToMemoizedOffset() - { - offset.setCurrentOffset(memoizedOffset); - setRowPointerValues(); - } } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/RowFilteringIndexAdapter.java b/processing/src/main/java/org/apache/druid/segment/RowFilteringIndexAdapter.java deleted file mode 100644 index 0083c87f773..00000000000 --- a/processing/src/main/java/org/apache/druid/segment/RowFilteringIndexAdapter.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment; - -import org.apache.druid.segment.column.ColumnCapabilities; -import org.apache.druid.segment.data.BitmapValues; -import org.apache.druid.segment.data.CloseableIndexed; -import org.joda.time.Interval; - -import java.util.List; -import java.util.function.Predicate; - -/** - */ -public class RowFilteringIndexAdapter implements IndexableAdapter -{ - private final QueryableIndexIndexableAdapter baseAdapter; - private final Predicate filter; - - public RowFilteringIndexAdapter(QueryableIndexIndexableAdapter baseAdapter, Predicate filter) - { - this.baseAdapter = baseAdapter; - this.filter = filter; - } - - @Override - public Interval getDataInterval() - { - return baseAdapter.getDataInterval(); - } - - @Override - public int getNumRows() - { - return baseAdapter.getNumRows(); - } - - @Override - public List getDimensionNames() - { - return baseAdapter.getDimensionNames(); - } - - @Override - public List getMetricNames() - { - return baseAdapter.getMetricNames(); - } - - @Override - public > CloseableIndexed getDimValueLookup(String dimension) - { - return baseAdapter.getDimValueLookup(dimension); - } - - @Override - public TransformableRowIterator getRows() - { - QueryableIndexIndexableAdapter.RowIteratorImpl baseRowIterator = baseAdapter.getRows(); - return new ForwardingRowIterator(baseRowIterator) - { - /** - * This memoization is needed to conform to {@link RowIterator#getPointer()} specification. - */ - private boolean memoizedOffset = false; - - @Override - public boolean moveToNext() - { - while (baseRowIterator.moveToNext()) { - if (filter.test(baseRowIterator.getPointer())) { - baseRowIterator.memoizeOffset(); - memoizedOffset = true; - return true; - } - } - // Setting back to the last valid offset in this iterator, as required by RowIterator.getPointer() spec. - if (memoizedOffset) { - baseRowIterator.resetToMemoizedOffset(); - } - return false; - } - }; - } - - @Override - public String getMetricType(String metric) - { - return baseAdapter.getMetricType(metric); - } - - @Override - public ColumnCapabilities getCapabilities(String column) - { - return baseAdapter.getCapabilities(column); - } - - @Override - public BitmapValues getBitmapValues(String dimension, int dictId) - { - return baseAdapter.getBitmapValues(dimension, dictId); - } - - @Override - public Metadata getMetadata() - { - return baseAdapter.getMetadata(); - } -} diff --git a/processing/src/test/java/org/apache/druid/segment/AppendTest.java b/processing/src/test/java/org/apache/druid/segment/AppendTest.java deleted file mode 100644 index e64b06122da..00000000000 --- a/processing/src/test/java/org/apache/druid/segment/AppendTest.java +++ /dev/null @@ -1,699 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.apache.druid.collections.CloseableStupidPool; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.query.Druids; -import org.apache.druid.query.QueryPlus; -import org.apache.druid.query.QueryRunner; -import org.apache.druid.query.QueryRunnerTestHelper; -import org.apache.druid.query.Result; -import org.apache.druid.query.TestQueryRunners; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.CountAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; -import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; -import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator; -import org.apache.druid.query.aggregation.post.ConstantPostAggregator; -import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator; -import org.apache.druid.query.filter.AndDimFilter; -import org.apache.druid.query.filter.NotDimFilter; -import org.apache.druid.query.filter.OrDimFilter; -import org.apache.druid.query.filter.SelectorDimFilter; -import org.apache.druid.query.search.SearchHit; -import org.apache.druid.query.search.SearchQuery; -import org.apache.druid.query.search.SearchResultValue; -import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; -import org.apache.druid.query.spec.QuerySegmentSpec; -import org.apache.druid.query.timeboundary.TimeBoundaryQuery; -import org.apache.druid.query.timeboundary.TimeBoundaryResultValue; -import org.apache.druid.query.timeseries.TimeseriesQuery; -import org.apache.druid.query.timeseries.TimeseriesResultValue; -import org.apache.druid.query.topn.TopNQuery; -import org.apache.druid.query.topn.TopNQueryBuilder; -import org.apache.druid.query.topn.TopNResultValue; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - */ -@Ignore -public class AppendTest -{ - private static final AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ - new DoubleSumAggregatorFactory("index", "index"), - new CountAggregatorFactory("count"), - new HyperUniquesAggregatorFactory("quality_uniques", "quality") - }; - private static final AggregatorFactory[] METRIC_AGGS_NO_UNIQ = new AggregatorFactory[]{ - new DoubleSumAggregatorFactory("index", "index"), - new CountAggregatorFactory("count") - }; - - final String dataSource = "testing"; - final Granularity ALL_GRAN = Granularities.ALL; - final String marketDimension = "market"; - final String qualityDimension = "quality"; - final String placementDimension = "placement"; - final String placementishDimension = "placementish"; - final String indexMetric = "index"; - final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows"); - final DoubleSumAggregatorFactory indexDoubleSum = new DoubleSumAggregatorFactory("index", "index"); - final HyperUniquesAggregatorFactory uniques = new HyperUniquesAggregatorFactory("uniques", "quality_uniques"); - final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L); - final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows"); - final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index"); - final ArithmeticPostAggregator addRowsIndexConstant = - new ArithmeticPostAggregator( - "addRowsIndexConstant", "+", Lists.newArrayList(constant, rowsPostAgg, indexPostAgg) - ); - final List commonAggregators = Arrays.asList(rowsCount, indexDoubleSum, uniques); - - final QuerySegmentSpec fullOnInterval = new MultipleIntervalSegmentSpec( - Collections.singletonList(Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z")) - ); - - private Segment segment; - private Segment segment2; - private Segment segment3; - - @Before - public void setUp() - { - SchemalessIndexTest schemalessIndexTest = new SchemalessIndexTest(OffHeapMemorySegmentWriteOutMediumFactory.instance()); - // (1, 2) cover overlapping segments of the form - // |------| - // |--------| - QueryableIndex appendedIndex = schemalessIndexTest.getAppendedIncrementalIndex( - Arrays.asList( - new Pair("append.json.1", METRIC_AGGS_NO_UNIQ), - new Pair("append.json.2", METRIC_AGGS) - ), - Arrays.asList( - Intervals.of("2011-01-12T00:00:00.000Z/2011-01-16T00:00:00.000Z"), - Intervals.of("2011-01-14T22:00:00.000Z/2011-01-16T00:00:00.000Z") - ) - ); - segment = new QueryableIndexSegment(appendedIndex, null); - - // (3, 4) cover overlapping segments of the form - // |------------| - // |-----| - QueryableIndex append2 = schemalessIndexTest.getAppendedIncrementalIndex( - Arrays.asList(new Pair<>("append.json.3", METRIC_AGGS_NO_UNIQ), new Pair<>("append.json.4", METRIC_AGGS)), - Arrays.asList( - Intervals.of("2011-01-12T00:00:00.000Z/2011-01-16T00:00:00.000Z"), - Intervals.of("2011-01-13T00:00:00.000Z/2011-01-14T00:00:00.000Z") - ) - ); - segment2 = new QueryableIndexSegment(append2, null); - - // (5, 6, 7) test gaps that can be created in data because of rows being discounted - // |-------------| - // |---| - // |---| - QueryableIndex append3 = schemalessIndexTest.getAppendedIncrementalIndex( - Arrays.asList( - new Pair<>("append.json.5", METRIC_AGGS), - new Pair<>("append.json.6", METRIC_AGGS), - new Pair<>("append.json.7", METRIC_AGGS) - ), - Arrays.asList( - Intervals.of("2011-01-12T00:00:00.000Z/2011-01-22T00:00:00.000Z"), - Intervals.of("2011-01-13T00:00:00.000Z/2011-01-16T00:00:00.000Z"), - Intervals.of("2011-01-18T00:00:00.000Z/2011-01-21T00:00:00.000Z") - ) - ); - segment3 = new QueryableIndexSegment(append3, null); - } - - @Test - public void testTimeBoundary() - { - List> expectedResults = Collections.singletonList( - new Result<>( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, - DateTimes.of("2011-01-12T00:00:00.000Z"), - TimeBoundaryQuery.MAX_TIME, - DateTimes.of("2011-01-15T02:00:00.000Z") - ) - ) - ) - ); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() - .dataSource(dataSource) - .build(); - QueryRunner runner = TestQueryRunners.makeTimeBoundaryQueryRunner(segment); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testTimeBoundary2() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeBoundaryResultValue( - ImmutableMap.of( - TimeBoundaryQuery.MIN_TIME, - DateTimes.of("2011-01-12T00:00:00.000Z"), - TimeBoundaryQuery.MAX_TIME, - DateTimes.of("2011-01-15T00:00:00.000Z") - ) - ) - ) - ); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder() - .dataSource(dataSource) - .build(); - QueryRunner runner = TestQueryRunners.makeTimeBoundaryQueryRunner(segment2); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testTimeSeries() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeseriesResultValue( - ImmutableMap.builder() - .put("rows", 8L) - .put("index", 700.0D) - .put("addRowsIndexConstant", 709.0D) - .put("uniques", 1.0002442201269182D) - .put("maxIndex", 100.0D) - .put("minIndex", 0.0D) - .build() - ) - ) - ); - - TimeseriesQuery query = makeTimeseriesQuery(); - QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testTimeSeries2() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeseriesResultValue( - ImmutableMap.builder() - .put("rows", 7L) - .put("index", 500.0D) - .put("addRowsIndexConstant", 508.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0D) - .put("minIndex", 0.0D) - .build() - ) - ) - ); - - TimeseriesQuery query = makeTimeseriesQuery(); - QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment2); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testFilteredTimeSeries() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeseriesResultValue( - ImmutableMap.builder() - .put("rows", 5L) - .put("index", 500.0D) - .put("addRowsIndexConstant", 506.0D) - .put("uniques", 1.0002442201269182D) - .put("maxIndex", 100.0D) - .put("minIndex", 100.0D) - .build() - ) - ) - ); - - TimeseriesQuery query = makeFilteredTimeseriesQuery(); - QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testFilteredTimeSeries2() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeseriesResultValue( - ImmutableMap.builder() - .put("rows", 4L) - .put("index", 400.0D) - .put("addRowsIndexConstant", 405.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0D) - .put("minIndex", 100.0D) - .build() - ) - ) - ); - - TimeseriesQuery query = makeFilteredTimeseriesQuery(); - QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment2); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testTopNSeries() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TopNResultValue( - Arrays.asList( - ImmutableMap.builder() - .put("market", "spot") - .put("rows", 3L) - .put("index", 300.0D) - .put("addRowsIndexConstant", 304.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0) - .put("minIndex", 100.0) - .build(), - QueryRunnerTestHelper.orderedMap( - "market", null, - "rows", 3L, - "index", 200.0D, - "addRowsIndexConstant", 204.0D, - "uniques", 0.0D, - "maxIndex", 100.0, - "minIndex", 0.0 - ), - ImmutableMap.builder() - .put("market", "total_market") - .put("rows", 2L) - .put("index", 200.0D) - .put("addRowsIndexConstant", 203.0D) - .put("uniques", 1.0002442201269182D) - .put("maxIndex", 100.0D) - .put("minIndex", 100.0D) - .build() - ) - ) - ) - ); - - TopNQuery query = makeTopNQuery(); - try (CloseableStupidPool pool = TestQueryRunners.createDefaultNonBlockingPool()) { - QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment, pool); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - } - - @Test - public void testTopNSeries2() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TopNResultValue( - Arrays.asList( - ImmutableMap.builder() - .put("market", "total_market") - .put("rows", 3L) - .put("index", 300.0D) - .put("addRowsIndexConstant", 304.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0D) - .put("minIndex", 100.0D) - .build(), - QueryRunnerTestHelper.orderedMap( - "market", null, - "rows", 3L, - "index", 100.0D, - "addRowsIndexConstant", 104.0D, - "uniques", 0.0D, - "maxIndex", 100.0, - "minIndex", 0.0 - ), - ImmutableMap.builder() - .put("market", "spot") - .put("rows", 1L) - .put("index", 100.0D) - .put("addRowsIndexConstant", 102.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0) - .put("minIndex", 100.0) - .build() - ) - ) - ) - ); - - TopNQuery query = makeTopNQuery(); - try (CloseableStupidPool pool = TestQueryRunners.createDefaultNonBlockingPool()) { - QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2, pool); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - } - - @Test - public void testFilteredTopNSeries() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TopNResultValue( - Collections.>singletonList( - ImmutableMap.builder() - .put("market", "spot") - .put("rows", 1L) - .put("index", 100.0D) - .put("addRowsIndexConstant", 102.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0) - .put("minIndex", 100.0) - .build() - ) - ) - ) - ); - - TopNQuery query = makeFilteredTopNQuery(); - try (CloseableStupidPool pool = TestQueryRunners.createDefaultNonBlockingPool()) { - QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment, pool); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - } - - @Test - public void testFilteredTopNSeries2() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TopNResultValue( - new ArrayList>() - ) - ) - ); - - TopNQuery query = makeFilteredTopNQuery(); - try (CloseableStupidPool pool = TestQueryRunners.createDefaultNonBlockingPool()) { - QueryRunner runner = TestQueryRunners.makeTopNQueryRunner(segment2, pool); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - } - - @Test - public void testSearch() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new SearchResultValue( - Arrays.asList( - new SearchHit(placementishDimension, "a"), - new SearchHit(qualityDimension, "automotive"), - new SearchHit(placementDimension, "mezzanine"), - new SearchHit(marketDimension, "total_market") - ) - ) - ) - ); - - SearchQuery query = makeSearchQuery(); - QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testSearchWithOverlap() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new SearchResultValue( - Arrays.asList( - new SearchHit(placementishDimension, "a"), - new SearchHit(placementDimension, "mezzanine"), - new SearchHit(marketDimension, "total_market") - ) - ) - ) - ); - - SearchQuery query = makeSearchQuery(); - QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment2); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testFilteredSearch() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new SearchResultValue( - Arrays.asList( - new SearchHit(placementDimension, "mezzanine"), - new SearchHit(marketDimension, "total_market") - ) - ) - ) - ); - - SearchQuery query = makeFilteredSearchQuery(); - QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testFilteredSearch2() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new SearchResultValue( - Arrays.asList( - new SearchHit(placementishDimension, "a"), - new SearchHit(placementDimension, "mezzanine"), - new SearchHit(marketDimension, "total_market") - ) - ) - ) - ); - - SearchQuery query = makeFilteredSearchQuery(); - QueryRunner runner = TestQueryRunners.makeSearchQueryRunner(segment2); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - @Test - public void testRowFiltering() - { - List> expectedResults = Collections.singletonList( - new Result( - DateTimes.of("2011-01-12T00:00:00.000Z"), - new TimeseriesResultValue( - ImmutableMap.builder() - .put("rows", 5L) - .put("index", 500.0D) - .put("addRowsIndexConstant", 506.0D) - .put("uniques", 0.0D) - .put("maxIndex", 100.0D) - .put("minIndex", 100.0D) - .build() - ) - ) - ); - - TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() - .dataSource(dataSource) - .granularity(ALL_GRAN) - .intervals(fullOnInterval) - .filters(marketDimension, "breakstuff") - .aggregators( - Lists.newArrayList( - Iterables.concat( - commonAggregators, - Lists.newArrayList( - new DoubleMaxAggregatorFactory("maxIndex", "index"), - new DoubleMinAggregatorFactory("minIndex", "index") - ) - ) - ) - ) - .postAggregators(addRowsIndexConstant) - .build(); - QueryRunner runner = TestQueryRunners.makeTimeSeriesQueryRunner(segment3); - TestHelper.assertExpectedResults(expectedResults, runner.run(QueryPlus.wrap(query))); - } - - private TimeseriesQuery makeTimeseriesQuery() - { - return Druids.newTimeseriesQueryBuilder() - .dataSource(dataSource) - .granularity(ALL_GRAN) - .intervals(fullOnInterval) - .aggregators( - Lists.newArrayList( - Iterables.concat( - commonAggregators, - Lists.newArrayList( - new DoubleMaxAggregatorFactory("maxIndex", "index"), - new DoubleMinAggregatorFactory("minIndex", "index") - ) - ) - ) - ) - .postAggregators(addRowsIndexConstant) - .build(); - } - - private TimeseriesQuery makeFilteredTimeseriesQuery() - { - return Druids.newTimeseriesQueryBuilder() - .dataSource(dataSource) - .granularity(ALL_GRAN) - .intervals(fullOnInterval) - .filters( - new OrDimFilter( - new SelectorDimFilter(marketDimension, "spot", null), - new SelectorDimFilter(marketDimension, "total_market", null) - ) - ) - .aggregators( - Lists.newArrayList( - Iterables.concat( - commonAggregators, - Lists.newArrayList( - new DoubleMaxAggregatorFactory("maxIndex", "index"), - new DoubleMinAggregatorFactory("minIndex", "index") - ) - ) - ) - ) - .postAggregators(addRowsIndexConstant) - .build(); - } - - private TopNQuery makeTopNQuery() - { - return new TopNQueryBuilder() - .dataSource(dataSource) - .granularity(ALL_GRAN) - .dimension(marketDimension) - .metric(indexMetric) - .threshold(3) - .intervals(fullOnInterval) - .aggregators( - Lists.newArrayList( - Iterables.concat( - commonAggregators, - Lists.newArrayList( - new DoubleMaxAggregatorFactory("maxIndex", "index"), - new DoubleMinAggregatorFactory("minIndex", "index") - ) - ) - ) - ) - .postAggregators(addRowsIndexConstant) - .build(); - } - - private TopNQuery makeFilteredTopNQuery() - { - return new TopNQueryBuilder() - .dataSource(dataSource) - .granularity(ALL_GRAN) - .dimension(marketDimension) - .metric(indexMetric) - .threshold(3) - .filters( - new AndDimFilter( - new SelectorDimFilter(marketDimension, "spot", null), - new SelectorDimFilter(placementDimension, "preferred", null) - ) - ) - .intervals(fullOnInterval) - .aggregators( - Lists.newArrayList( - Iterables.concat( - commonAggregators, - Lists.newArrayList( - new DoubleMaxAggregatorFactory("maxIndex", "index"), - new DoubleMinAggregatorFactory("minIndex", "index") - ) - ) - ) - ) - .postAggregators(addRowsIndexConstant) - .build(); - } - - private SearchQuery makeSearchQuery() - { - return Druids.newSearchQueryBuilder() - .dataSource(dataSource) - .granularity(ALL_GRAN) - .intervals(fullOnInterval) - .query("a") - .build(); - } - - private SearchQuery makeFilteredSearchQuery() - { - return Druids.newSearchQueryBuilder() - .dataSource(dataSource) - .filters(new NotDimFilter(new SelectorDimFilter(marketDimension, "spot", null))) - .granularity(ALL_GRAN) - .intervals(fullOnInterval) - .query("a") - .build(); - } -} diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index fc7bc7ec54c..55a4488012e 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -22,7 +22,6 @@ package org.apache.druid.segment; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequences; @@ -178,11 +177,8 @@ public class IndexBuilder ); } final QueryableIndex merged = TestHelper.getTestIndexIO().loadIndex( - indexMerger.merge( - Lists.transform( - persisted, - QueryableIndexIndexableAdapter::new - ), + indexMerger.mergeQueryableIndex( + persisted, true, Iterables.toArray( Iterables.transform( @@ -191,8 +187,12 @@ public class IndexBuilder ), AggregatorFactory.class ), + null, new File(tmpDir, StringUtils.format("testIndex-%s", UUID.randomUUID())), indexSpec, + indexSpec, + new BaseProgressIndicator(), + null, -1 ) ); diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java index 0929e79b106..063ac799db8 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java @@ -460,68 +460,6 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest assertDimCompression(merged, indexSpec.getDimensionCompression()); } - @Test - public void testAppendRetainsValues() throws Exception - { - final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File mergedDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - indexIO.loadIndex(indexMerger.append(ImmutableList.of(incrementalAdapter), null, tempDir1, indexSpec, null)) - ); - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - indexIO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(3, index1.getColumnNames().size()); - - Assert.assertArrayEquals( - IncrementalIndexTest.getDefaultCombiningAggregatorFactories(), - index1.getMetadata().getAggregators() - ); - - AggregatorFactory[] mergedAggregators = new AggregatorFactory[]{new CountAggregatorFactory("count")}; - QueryableIndex merged = closer.closeLater( - indexIO.loadIndex( - indexMerger.mergeQueryableIndex( - ImmutableList.of(index1), - true, - mergedAggregators, - mergedDir, - indexSpec, - null, - -1 - ) - ) - ); - - Assert.assertEquals(2, merged.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(merged.getAvailableDimensions())); - Assert.assertEquals(3, merged.getColumnNames().size()); - - indexIO.validateTwoSegments(tempDir1, mergedDir); - - assertDimCompression(index1, indexSpec.getDimensionCompression()); - assertDimCompression(merged, indexSpec.getDimensionCompression()); - - Assert.assertArrayEquals( - getCombiningAggregators(mergedAggregators), - merged.getMetadata().getAggregators() - ); - } - @Test public void testMergeSpecChange() throws Exception { @@ -589,134 +527,6 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest assertDimCompression(merged, newSpec.getDimensionCompression()); } - - @Test - public void testConvertSame() throws Exception - { - final long timestamp = System.currentTimeMillis(); - final AggregatorFactory[] aggregators = new AggregatorFactory[]{ - new LongSumAggregatorFactory( - "longSum1", - "dim1" - ), - new LongSumAggregatorFactory("longSum2", "dim2") - }; - - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(aggregators); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File convertDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - indexIO.loadIndex(indexMerger.persist(toPersist1, tempDir1, indexSpec, null)) - ); - - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - indexIO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(4, index1.getColumnNames().size()); - - - QueryableIndex converted = closer.closeLater( - indexIO.loadIndex(indexMerger.convert(tempDir1, convertDir, indexSpec)) - ); - - Assert.assertEquals(2, converted.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); - Assert.assertEquals(4, converted.getColumnNames().size()); - - indexIO.validateTwoSegments(tempDir1, convertDir); - - assertDimCompression(index1, indexSpec.getDimensionCompression()); - assertDimCompression(converted, indexSpec.getDimensionCompression()); - - Assert.assertArrayEquals( - getCombiningAggregators(aggregators), - converted.getMetadata().getAggregators() - ); - } - - - @Test - public void testConvertDifferent() throws Exception - { - final long timestamp = System.currentTimeMillis(); - final AggregatorFactory[] aggregators = new AggregatorFactory[]{ - new LongSumAggregatorFactory( - "longSum1", - "dim1" - ), - new LongSumAggregatorFactory("longSum2", "dim2") - }; - - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(aggregators); - IncrementalIndexTest.populateIndex(timestamp, toPersist1); - - final File tempDir1 = temporaryFolder.newFolder(); - final File convertDir = temporaryFolder.newFolder(); - final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter( - toPersist1.getInterval(), - toPersist1, - indexSpec.getBitmapSerdeFactory() - .getBitmapFactory() - ); - - QueryableIndex index1 = closer.closeLater( - indexIO.loadIndex(indexMerger.persist(toPersist1, tempDir1, indexSpec, null)) - ); - - - final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); - - indexIO.validateTwoSegments(incrementalAdapter, queryableAdapter); - - Assert.assertEquals(2, index1.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); - Assert.assertEquals(4, index1.getColumnNames().size()); - - - IndexSpec newSpec = new IndexSpec( - indexSpec.getBitmapSerdeFactory(), - CompressionStrategy.LZ4.equals(indexSpec.getDimensionCompression()) ? - CompressionStrategy.LZF : - CompressionStrategy.LZ4, - CompressionStrategy.LZ4.equals(indexSpec.getDimensionCompression()) ? - CompressionStrategy.LZF : - CompressionStrategy.LZ4, - CompressionFactory.LongEncodingStrategy.LONGS.equals(indexSpec.getLongEncoding()) ? - CompressionFactory.LongEncodingStrategy.AUTO : - CompressionFactory.LongEncodingStrategy.LONGS - ); - - QueryableIndex converted = closer.closeLater( - indexIO.loadIndex(indexMerger.convert(tempDir1, convertDir, newSpec)) - ); - - Assert.assertEquals(2, converted.getColumnHolder(ColumnHolder.TIME_COLUMN_NAME).getLength()); - Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(converted.getAvailableDimensions())); - Assert.assertEquals(4, converted.getColumnNames().size()); - - indexIO.validateTwoSegments(tempDir1, convertDir); - - assertDimCompression(index1, indexSpec.getDimensionCompression()); - assertDimCompression(converted, newSpec.getDimensionCompression()); - - Assert.assertArrayEquals( - getCombiningAggregators(aggregators), - converted.getMetadata().getAggregators() - ); - } - private void assertDimCompression(QueryableIndex index, CompressionStrategy expectedStrategy) throws Exception { diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java deleted file mode 100644 index 4f616460fa5..00000000000 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.segment; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteSource; -import com.google.common.io.Files; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.MapBasedInputRow; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.JodaUtils; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.CountAggregatorFactory; -import org.apache.druid.segment.data.CompressionFactory; -import org.apache.druid.segment.data.CompressionStrategy; -import org.apache.druid.segment.data.ConciseBitmapSerdeFactory; -import org.apache.druid.segment.incremental.IncrementalIndex; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.incremental.OnheapIncrementalIndex; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; -import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; -import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@RunWith(Parameterized.class) -public class IndexMergerV9CompatibilityTest -{ - @Parameterized.Parameters - public static Collection constructorFeeder() - { - return ImmutableList.of( - new Object[] {TmpFileSegmentWriteOutMediumFactory.instance()}, - new Object[] {OffHeapMemorySegmentWriteOutMediumFactory.instance()} - ); - } - - private static final long TIMESTAMP = DateTimes.of("2014-01-01").getMillis(); - private static final AggregatorFactory[] DEFAULT_AGG_FACTORIES = new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - }; - - private static final IndexSpec INDEX_SPEC = IndexMergerTestBase.makeIndexSpec( - new ConciseBitmapSerdeFactory(), - CompressionStrategy.LZ4, - CompressionStrategy.LZ4, - CompressionFactory.LongEncodingStrategy.LONGS - ); - private static final List DIMS = ImmutableList.of("dim0", "dim1"); - - private final Collection events; - @Rule - public final CloserRule closer = new CloserRule(false); - private final IndexMerger indexMerger; - private final IndexIO indexIO; - - public IndexMergerV9CompatibilityTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) - { - indexMerger = TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory); - indexIO = TestHelper.getTestIndexIO(); - events = new ArrayList<>(); - - final Map map1 = ImmutableMap.of( - DIMS.get(0), ImmutableList.of("dim00", "dim01"), - DIMS.get(1), "dim10" - ); - - final List nullList = Collections.singletonList(null); - - final Map map2 = ImmutableMap.of( - DIMS.get(0), nullList, - DIMS.get(1), "dim10" - ); - - final Map map3 = ImmutableMap.of( - DIMS.get(0), - ImmutableList.of("dim00", "dim01") - ); - - final Map map4 = ImmutableMap.of(); - - final Map map5 = ImmutableMap.of(DIMS.get(1), "dim10"); - - final Map map6 = new HashMap<>(); - map6.put(DIMS.get(1), null); // ImmutableMap cannot take null - - int i = 0; - for (final Map map : Arrays.asList(map1, map2, map3, map4, map5, map6)) { - events.add(new MapBasedInputRow(TIMESTAMP + i++, DIMS, map)); - } - } - - IncrementalIndex toPersist; - File tmpDir; - File persistTmpDir; - - @Before - public void setUp() throws IOException - { - toPersist = new OnheapIncrementalIndex.Builder() - .setIndexSchema( - new IncrementalIndexSchema.Builder() - .withMinTimestamp(JodaUtils.MIN_INSTANT) - .withMetrics(DEFAULT_AGG_FACTORIES) - .build() - ) - .setMaxRowCount(1000000) - .build(); - - toPersist.getMetadata().put("key", "value"); - for (InputRow event : events) { - toPersist.add(event); - } - tmpDir = FileUtils.createTempDir(); - persistTmpDir = new File(tmpDir, "persistDir"); - FileUtils.mkdirp(persistTmpDir); - String[] files = new String[] {"00000.smoosh", "meta.smoosh", "version.bin"}; - for (String file : files) { - new ByteSource() - { - @Override - public InputStream openStream() - { - return IndexMergerV9CompatibilityTest.class.getResourceAsStream("/v8SegmentPersistDir/" + file); - } - }.copyTo(Files.asByteSink(new File(persistTmpDir, file))); - } - } - - @After - public void tearDown() throws IOException - { - FileUtils.deleteDirectory(tmpDir); - } - - @Test - public void testPersistWithSegmentMetadata() throws IOException - { - File outDir = FileUtils.createTempDir(); - QueryableIndex index = null; - try { - outDir = FileUtils.createTempDir(); - index = indexIO.loadIndex(indexMerger.persist(toPersist, outDir, INDEX_SPEC, null)); - - Assert.assertEquals("value", index.getMetadata().get("key")); - } - finally { - if (index != null) { - index.close(); - } - - if (outDir != null) { - FileUtils.deleteDirectory(outDir); - } - } - } - - @Test - public void testSimpleReprocess() throws IOException - { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( - closer.closeLater( - indexIO.loadIndex( - persistTmpDir - ) - ) - ); - Assert.assertEquals(events.size(), adapter.getNumRows()); - reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); - } - - private File reprocessAndValidate(File inDir, File tmpDir) throws IOException - { - final File outDir = indexMerger.convert( - inDir, - tmpDir, - INDEX_SPEC - ); - indexIO.validateTwoSegments(persistTmpDir, outDir); - return outDir; - } - - @Test - public void testIdempotentReprocess() throws IOException - { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( - closer.closeLater( - indexIO.loadIndex( - persistTmpDir - ) - ) - ); - Assert.assertEquals(events.size(), adapter.getNumRows()); - final File tmpDir1 = new File(tmpDir, "reprocessed1"); - reprocessAndValidate(persistTmpDir, tmpDir1); - - final File tmpDir2 = new File(tmpDir, "reprocessed2"); - final IndexableAdapter adapter2 = new QueryableIndexIndexableAdapter(closer.closeLater(indexIO.loadIndex(tmpDir1))); - Assert.assertEquals(events.size(), adapter2.getNumRows()); - reprocessAndValidate(tmpDir1, tmpDir2); - - final File tmpDir3 = new File(tmpDir, "reprocessed3"); - final IndexableAdapter adapter3 = new QueryableIndexIndexableAdapter(closer.closeLater(indexIO.loadIndex(tmpDir2))); - Assert.assertEquals(events.size(), adapter3.getNumRows()); - reprocessAndValidate(tmpDir2, tmpDir3); - } -} diff --git a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java index 1352a115add..a44a04cc6ef 100644 --- a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java @@ -27,10 +27,8 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -44,13 +42,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.timeline.Overshadowable; -import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.apache.druid.timeline.partition.NoneShardSpec; -import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; -import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; import javax.annotation.Nullable; @@ -314,14 +306,6 @@ public class SchemalessIndexTest } } - public QueryableIndex getAppendedIncrementalIndex( - Iterable> files, - List intervals - ) - { - return makeAppendedMMappedIndex(files, intervals); - } - public QueryableIndex getMergedIncrementalIndexDiffMetrics() { return getMergedIncrementalIndex( @@ -463,74 +447,6 @@ public class SchemalessIndexTest return filesToMap; } - private QueryableIndex makeAppendedMMappedIndex( - Iterable> files, - final List intervals - ) - { - try { - File tmpFile = File.createTempFile("yay", "boo"); - tmpFile.delete(); - File mergedFile = new File(tmpFile, "merged"); - FileUtils.mkdirp(mergedFile); - mergedFile.deleteOnExit(); - - List filesToMap = makeFilesToMap(tmpFile, files); - - VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>( - Comparators.naturalNullsFirst() - ); - - ShardSpec noneShardSpec = NoneShardSpec.instance(); - - for (int i = 0; i < intervals.size(); i++) { - timeline.add(intervals.get(i), i, noneShardSpec.createChunk(new OvershadowableFile(i, filesToMap.get(i)))); - } - - final List adapters = Lists.newArrayList( - Iterables.concat( - // TimelineObjectHolder is actually an iterable of iterable of indexable adapters - Iterables.transform( - timeline.lookup(Intervals.of("1000-01-01/3000-01-01")), - new Function, Iterable>() - { - @Override - public Iterable apply(final TimelineObjectHolder timelineObjectHolder) - { - return Iterables.transform( - timelineObjectHolder.getObject(), - - // Each chunk can be used to build the actual IndexableAdapter - new Function, IndexableAdapter>() - { - @Override - public IndexableAdapter apply(PartitionChunk chunk) - { - try { - return new RowFilteringIndexAdapter( - new QueryableIndexIndexableAdapter(indexIO.loadIndex(chunk.getObject().file)), - rowPointer -> timelineObjectHolder.getInterval().contains(rowPointer.getTimestamp()) - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - } - ); - } - } - ) - ) - ); - - return indexIO.loadIndex(indexMerger.append(adapters, null, mergedFile, INDEX_SPEC, null)); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - private QueryableIndex makeMergedMMappedIndex(Iterable> files) { try { diff --git a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java index 3170507b2bd..2ce9296e58a 100644 --- a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java +++ b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java @@ -29,10 +29,10 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.QueryableIndexIndexableAdapter; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -48,7 +48,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class SegmentGenerator implements Closeable { @@ -175,19 +174,25 @@ public class SegmentGenerator implements Closeable throw new ISE("No rows to index?"); } else { try { + final IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); + retVal = TestHelper .getTestIndexIO() .loadIndex( TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .merge( - indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList()), + .mergeQueryableIndex( + indexes, false, schemaInfo.getAggs() .stream() .map(AggregatorFactory::getCombiningFactory) .toArray(AggregatorFactory[]::new), + null, outDir, - new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null), + indexSpec, + indexSpec, + new BaseProgressIndicator(), + null, -1 ) ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 4b599ee8b83..f0ee59f59e0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -55,6 +55,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; @@ -922,6 +923,8 @@ public class AppenderatorImpl implements Appenderator schema.getDimensionsSpec(), mergedTarget, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), + new BaseProgressIndicator(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.getMaxColumnsToMerge() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java index 7b91ac42ec1..ae5b06fcb39 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java @@ -50,6 +50,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; @@ -794,6 +795,8 @@ public class BatchAppenderator implements Appenderator schema.getDimensionsSpec(), mergedTarget, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), + new BaseProgressIndicator(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.getMaxColumnsToMerge() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index dbdddf75577..26b492ae149 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -55,6 +55,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; @@ -858,6 +859,8 @@ public class StreamAppenderator implements Appenderator schema.getDimensionsSpec(), mergedTarget, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), + new BaseProgressIndicator(), tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.getMaxColumnsToMerge() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index a815f19f1f0..5a7a6141760 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -47,6 +47,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexSpec; @@ -70,12 +71,10 @@ import org.joda.time.Period; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; /** * Manages {@link Appenderator} instances for the CliIndexer task execution service, which runs all tasks in @@ -436,9 +435,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager { taskAppenderatorMap.computeIfAbsent( taskId, - (myTaskId) -> { - return new ArrayList<>(); - } + myTaskId -> new ArrayList<>() ).add(appenderator); } } @@ -586,103 +583,26 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager this.mergeExecutor = mergeExecutor; } - @Override - public File mergeQueryableIndex( - List indexes, - boolean rollup, - AggregatorFactory[] metricAggs, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - int maxColumnsToMerge - ) - { - return mergeQueryableIndex( - indexes, - rollup, - metricAggs, - null, - outDir, - indexSpec, - segmentWriteOutMediumFactory, - maxColumnsToMerge - ); - } - - @Override - public File mergeQueryableIndex( - List indexes, - boolean rollup, - AggregatorFactory[] metricAggs, - @Nullable DimensionsSpec dimensionsSpec, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - int maxColumnsToMerge - ) - { - ListenableFuture mergeFuture = mergeExecutor.submit( - new Callable() - { - @Override - public File call() - { - try { - return baseMerger.mergeQueryableIndex( - indexes, - rollup, - metricAggs, - dimensionsSpec, - outDir, - indexSpec, - segmentWriteOutMediumFactory, - maxColumnsToMerge - ); - } - catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - } - ); - - try { - return mergeFuture.get(); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - @Override public File persist( IncrementalIndex index, Interval dataInterval, File outDir, IndexSpec indexSpec, + ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) { ListenableFuture mergeFuture = mergeExecutor.submit( - new Callable() - { - @Override - public File call() - { - try { - return baseMerger.persist( - index, - dataInterval, - outDir, - indexSpec, - segmentWriteOutMediumFactory - ); - } - catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - } + () -> + baseMerger.persist( + index, + dataInterval, + outDir, + indexSpec, + progress, + segmentWriteOutMediumFactory + ) ); try { @@ -703,52 +623,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager int maxColumnsToMerge ) { - throw new UOE(ERROR_MSG); - } - - @Override - public File convert( - File inDir, - File outDir, - IndexSpec indexSpec - ) - { - throw new UOE(ERROR_MSG); - } - - @Override - public File append( - List indexes, - AggregatorFactory[] aggregators, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) - { - throw new UOE(ERROR_MSG); - } - - @Override - public File persist( - IncrementalIndex index, - File outDir, - IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) - { - throw new UOE(ERROR_MSG); - } - - @Override - public File persist( - IncrementalIndex index, - Interval dataInterval, - File outDir, - IndexSpec indexSpec, - ProgressIndicator progress, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory - ) - { + // Only used in certain tests. No need to implement. throw new UOE(ERROR_MSG); } @@ -760,12 +635,34 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager @Nullable DimensionsSpec dimensionsSpec, File outDir, IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, ProgressIndicator progress, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, int maxColumnsToMerge ) { - throw new UOE(ERROR_MSG); + ListenableFuture mergeFuture = mergeExecutor.submit( + () -> + baseMerger.mergeQueryableIndex( + indexes, + rollup, + metricAggs, + dimensionsSpec, + outDir, + indexSpec, + indexSpecForIntermediatePersists, + new BaseProgressIndicator(), + segmentWriteOutMediumFactory, + maxColumnsToMerge + ) + ); + + try { + return mergeFuture.get(); + } + catch (Exception e) { + throw new RuntimeException(e); + } } } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 34daf3af566..757266c3a5e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -52,6 +52,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.BaseProgressIndicator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.Metadata; @@ -443,8 +444,11 @@ public class RealtimePlumber implements Plumber indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), + null, mergedTarget, config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), + new BaseProgressIndicator(), config.getSegmentWriteOutMediumFactory(), -1 ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java index ade737f8653..8f95eacb7ce 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java @@ -20,9 +20,11 @@ package org.apache.druid.segment.realtime.appenderator; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; @@ -30,9 +32,16 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DirectQueryProcessingPool; import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.IndexableAdapter; +import org.apache.druid.segment.ProgressIndicator; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.NoopRowIngestionMeters; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.indexing.DataSchema; @@ -41,17 +50,24 @@ import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.NoopDataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; import java.util.Collections; +import java.util.List; -public class UnifiedIndexerAppenderatorsManagerTest +public class UnifiedIndexerAppenderatorsManagerTest extends InitializedNullHandlingTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -127,4 +143,213 @@ public class UnifiedIndexerAppenderatorsManagerTest manager.getBundle(query); } + + @Test + public void test_removeAppenderatorsForTask() + { + Assert.assertEquals(ImmutableSet.of("myDataSource"), manager.getDatasourceBundles().keySet()); + manager.removeAppenderatorsForTask("taskId", "myDataSource"); + Assert.assertTrue(manager.getDatasourceBundles().isEmpty()); + } + + @Test + public void test_removeAppenderatorsForTask_withoutCreate() + { + // Not all tasks use Appenderators. "remove" may be called without "create", and nothing bad should happen. + manager.removeAppenderatorsForTask("someOtherTaskId", "someOtherDataSource"); + manager.removeAppenderatorsForTask("someOtherTaskId", "myDataSource"); + + // Should be no change. + Assert.assertEquals(ImmutableSet.of("myDataSource"), manager.getDatasourceBundles().keySet()); + } + + @Test + public void test_limitedPool_persist() throws IOException + { + final UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger limitedPoolIndexMerger = + new UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger( + new NoopIndexMerger(), + DirectQueryProcessingPool.INSTANCE + ); + + final File file = new File("xyz"); + + // Three forms of persist. + + Assert.assertEquals(file, limitedPoolIndexMerger.persist(null, null, file, null, null, null)); + Assert.assertEquals(file, limitedPoolIndexMerger.persist(null, null, file, null, null)); + + // Need a mocked index for this test, since getInterval is called on it. + final IncrementalIndex index = EasyMock.createMock(IncrementalIndex.class); + EasyMock.expect(index.getInterval()).andReturn(null); + EasyMock.replay(index); + Assert.assertEquals(file, limitedPoolIndexMerger.persist(index, file, null, null)); + EasyMock.verify(index); + } + + @Test + public void test_limitedPool_persistFail() + { + final UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger limitedPoolIndexMerger = + new UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger( + new NoopIndexMerger(true), + DirectQueryProcessingPool.INSTANCE + ); + + final File file = new File("xyz"); + + Assert.assertThrows( + "failed", + RuntimeException.class, // Wrapped IOException + () -> limitedPoolIndexMerger.persist(null, null, file, null, null, null) + ); + } + + @Test + public void test_limitedPool_mergeQueryableIndexFail() + { + final UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger limitedPoolIndexMerger = + new UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger( + new NoopIndexMerger(true), + DirectQueryProcessingPool.INSTANCE + ); + + final File file = new File("xyz"); + + Assert.assertThrows( + "failed", + RuntimeException.class, // Wrapped IOException + () -> limitedPoolIndexMerger.mergeQueryableIndex( + null, + false, + null, + null, + file, + null, + null, + null, + null, + -1 + ) + ); + } + + @Test + public void test_limitedPool_mergeQueryableIndex() throws IOException + { + final UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger limitedPoolIndexMerger = + new UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger( + new NoopIndexMerger(), + DirectQueryProcessingPool.INSTANCE + ); + + final File file = new File("xyz"); + + // Two forms of mergeQueryableIndex + Assert.assertEquals(file, limitedPoolIndexMerger.mergeQueryableIndex(null, false, null, file, null, null, -1)); + Assert.assertEquals( + file, + limitedPoolIndexMerger.mergeQueryableIndex( + null, + false, + null, + null, + file, + null, + null, + null, + null, + -1 + ) + ); + } + + @Test + public void test_limitedPool_merge() + { + final UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger limitedPoolIndexMerger = + new UnifiedIndexerAppenderatorsManager.LimitedPoolIndexMerger( + new NoopIndexMerger(), + DirectQueryProcessingPool.INSTANCE + ); + + final File file = new File("xyz"); + + // "merge" is neither necessary nor implemented + expectedException.expect(UnsupportedOperationException.class); + Assert.assertEquals(file, limitedPoolIndexMerger.merge(null, false, null, file, null, -1)); + } + + /** + * An {@link IndexMerger} that does nothing, but is useful for LimitedPoolIndexMerger tests. + */ + private static class NoopIndexMerger implements IndexMerger + { + private final boolean failCalls; + + public NoopIndexMerger(boolean failCalls) + { + this.failCalls = failCalls; + } + + public NoopIndexMerger() + { + this(false); + } + + @Override + public File persist( + IncrementalIndex index, + Interval dataInterval, + File outDir, + IndexSpec indexSpec, + ProgressIndicator progress, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException + { + if (failCalls) { + throw new IOException("failed"); + } + + return outDir; + } + + @Override + public File mergeQueryableIndex( + List indexes, + boolean rollup, + AggregatorFactory[] metricAggs, + @Nullable DimensionsSpec dimensionsSpec, + File outDir, + IndexSpec indexSpec, + IndexSpec indexSpecForIntermediatePersists, + ProgressIndicator progress, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge + ) throws IOException + { + if (failCalls) { + throw new IOException("failed"); + } + + return outDir; + } + + @Override + public File merge( + List indexes, + boolean rollup, + AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec, + int maxColumnsToMerge + ) throws IOException + { + if (failCalls) { + throw new IOException("failed"); + } + + return outDir; + } + } }