From 0460d45e92a15ebdadb4455afa16a7d977f74388 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Fri, 15 Apr 2022 09:08:06 -0700 Subject: [PATCH] Make tombstones ingestible by having them return an empty result set. (#12392) * Make tombstones ingestible by having them return an empty result set. * Spotbug * Coverage * Coverage * Remove unnecessary exception (checkstyle) * Fix integration test and add one more to test dropExisting set to false over tombstones * Force dropExisting to true in auto-compaction when the interval contains only tombstones * Checkstyle, fix unit test * Changed flag by mistake, fixing it * Remove method from interface since this method is specific to only DruidSegmentInputentity * Fix typo * Adapt to latest code * Update comments when only tombstones to compact * Move empty iterator to a new DruidTombstoneSegmentReader * Code review feedback * Checkstyle * Review feedback * Coverage --- .../apache/druid/common/guava/GuavaUtils.java | 5 +- .../apache/druid/data/input/InputEntity.java | 1 + .../indexing/common/task/CompactionTask.java | 165 ++++++++------- .../indexing/input/DruidInputSource.java | 1 - .../input/DruidSegmentInputEntity.java | 6 + .../input/DruidSegmentInputFormat.java | 31 ++- .../indexing/input/DruidSegmentReader.java | 6 +- .../input/DruidTombstoneSegmentReader.java | 88 ++++++++ .../common/task/CompactionTaskRunTest.java | 116 +++++++++++ .../DimensionCardinalityReportTest.java | 114 ++++++++++ .../input/DruidSegmentInputFormatTest.java | 88 ++++++++ .../input/DruidSegmentReaderTest.java | 114 +++++++++- .../duty/ITAutoCompactionTest.java | 195 ++++++++++++++++-- .../coordinator/duty/CompactSegments.java | 20 ++ 14 files changed, 845 insertions(+), 105 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java index b3455393e72..f8b9dd03b72 100644 --- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java +++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java @@ -71,7 +71,7 @@ public class GuavaUtils /** * If first argument is not null, return it, else return the other argument. Sort of like - * {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are + * {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are * null. */ @Nullable @@ -85,7 +85,8 @@ public class GuavaUtils /** * Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture} - * automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of + * automatically. Especially when we call + * {@link static ListenableFuture> com.google.common.util.concurrent.Futures#allAsList(Iterable> futures)} to create a batch of * future. * @param mayInterruptIfRunning {@code true} if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 343e245e22a..6765ae82717 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -136,4 +136,5 @@ public interface InputEntity { return Predicates.alwaysFalse(); } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 66e0c16a707..067f75ec7d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.NonnullPair; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -111,6 +112,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -550,6 +552,7 @@ public class CompactionTask extends AbstractBatchIndexTask segmentProvider, lockGranularityInUse ); + final Map segmentFileMap = pair.lhs; final List> timelineSegments = pair.rhs; @@ -557,9 +560,10 @@ public class CompactionTask extends AbstractBatchIndexTask return Collections.emptyList(); } - // find metadata for interval + // find metadata for intervals with real data segments // queryableIndexAndSegments is sorted by the interval of the dataSegment - final List> queryableIndexAndSegments = loadSegments( + // Note that this list will contain null QueriableIndex values for tombstones + final List> queryableIndexAndSegments = loadSegments( timelineSegments, segmentFileMap, toolbox.getIndexIO() @@ -568,8 +572,10 @@ public class CompactionTask extends AbstractBatchIndexTask final CompactionTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig(); if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) { + final List specs = new ArrayList<>(); + // original granularity - final Map>> intervalToSegments = new TreeMap<>( + final Map>> intervalToSegments = new TreeMap<>( Comparators.intervalsByStartThenEnd() ); queryableIndexAndSegments.forEach( @@ -578,11 +584,11 @@ public class CompactionTask extends AbstractBatchIndexTask ); // unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec - List>>> intervalToSegmentsUnified = + List>>> intervalToSegmentsUnified = new ArrayList<>(); Interval union = null; - List> segments = new ArrayList<>(); - for (Entry>> entry : intervalToSegments.entrySet()) { + List> segments = new ArrayList<>(); + for (Entry>> entry : intervalToSegments.entrySet()) { Interval cur = entry.getKey(); if (union == null) { union = cur; @@ -596,12 +602,12 @@ public class CompactionTask extends AbstractBatchIndexTask segments = new ArrayList<>(entry.getValue()); } } + intervalToSegmentsUnified.add(new NonnullPair<>(union, segments)); - final List specs = new ArrayList<>(intervalToSegmentsUnified.size()); - for (NonnullPair>> entry : intervalToSegmentsUnified) { + for (NonnullPair>> entry : intervalToSegmentsUnified) { final Interval interval = entry.lhs; - final List> segmentsToCompact = entry.rhs; + final List> segmentsToCompact = entry.rhs; // If granularitySpec is not null, then set segmentGranularity. Otherwise, // creates new granularitySpec and set segmentGranularity Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(); @@ -700,22 +706,19 @@ public class CompactionTask extends AbstractBatchIndexTask LockGranularity lockGranularityInUse ) throws IOException, SegmentLoadingException { - final List usedSegmentsMinusTombstones = - segmentProvider.findSegments(toolbox.getTaskActionClient()) - .stream() - .filter(dataSegment -> !dataSegment.isTombstone()) // skip tombstones - .collect(Collectors.toList()); - segmentProvider.checkSegments(lockGranularityInUse, usedSegmentsMinusTombstones); - final Map segmentFileMap = toolbox.fetchSegments(usedSegmentsMinusTombstones); + final List usedSegments = + segmentProvider.findSegments(toolbox.getTaskActionClient()); + segmentProvider.checkSegments(lockGranularityInUse, usedSegments); + final Map segmentFileMap = toolbox.fetchSegments(usedSegments); final List> timelineSegments = VersionedIntervalTimeline - .forSegments(usedSegmentsMinusTombstones) + .forSegments(usedSegments) .lookup(segmentProvider.interval); return new NonnullPair<>(segmentFileMap, timelineSegments); } private static DataSchema createDataSchema( String dataSource, - List> queryableIndexAndSegments, + List> queryableIndexAndSegments, @Nullable DimensionsSpec dimensionsSpec, @Nullable ClientCompactionTaskTransformSpec transformSpec, @Nullable AggregatorFactory[] metricsSpec, @@ -781,34 +784,36 @@ public class CompactionTask extends AbstractBatchIndexTask private static void decideRollupAndQueryGranularityCarryOver( SettableSupplier rollup, SettableSupplier queryGranularity, - List> queryableIndexAndSegments + List> queryableIndexAndSegments ) { final SettableSupplier rollupIsValid = new SettableSupplier<>(true); - for (NonnullPair pair : queryableIndexAndSegments) { + for (Pair pair : queryableIndexAndSegments) { final QueryableIndex index = pair.lhs; - if (index.getMetadata() == null) { - throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId()); - } - // carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment: - - // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false - if (rollupIsValid.get()) { - Boolean isRollup = index.getMetadata().isRollup(); - if (isRollup == null) { - rollupIsValid.set(false); - rollup.set(false); - } else if (rollup.get() == null) { - rollup.set(isRollup); - } else if (!rollup.get().equals(isRollup.booleanValue())) { - rollupIsValid.set(false); - rollup.set(false); + if (index != null) { // avoid tombstones + if (index.getMetadata() == null) { + throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId()); } - } + // carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment: - // Pick the finer, non-null, of the query granularities of the segments being compacted - Granularity current = index.getMetadata().getQueryGranularity(); - queryGranularity.set(compareWithCurrent(queryGranularity.get(), current)); + // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false + if (rollupIsValid.get()) { + Boolean isRollup = index.getMetadata().isRollup(); + if (isRollup == null) { + rollupIsValid.set(false); + rollup.set(false); + } else if (rollup.get() == null) { + rollup.set(isRollup); + } else if (!rollup.get().equals(isRollup.booleanValue())) { + rollupIsValid.set(false); + rollup.set(false); + } + } + + // Pick the finer, non-null, of the query granularities of the segments being compacted + Granularity current = index.getMetadata().getQueryGranularity(); + queryGranularity.set(compareWithCurrent(queryGranularity.get(), current)); + } } } @@ -828,22 +833,28 @@ public class CompactionTask extends AbstractBatchIndexTask } private static AggregatorFactory[] createMetricsSpec( - List> queryableIndexAndSegments + List> queryableIndexAndSegments ) { final List aggregatorFactories = queryableIndexAndSegments .stream() + .filter(pair -> pair.lhs != null) // avoid tombstones .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata() .collect(Collectors.toList()); final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories); if (mergedAggregators == null) { - throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories); + Optional> pair = + queryableIndexAndSegments.stream().filter(p -> !p.rhs.isTombstone()).findFirst(); + if (pair.isPresent()) { + // this means that there are true data segments, so something went wrong + throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories); + } } return mergedAggregators; } - private static DimensionsSpec createDimensionsSpec(List> queryableIndices) + private static DimensionsSpec createDimensionsSpec(List> queryableIndices) { final BiMap uniqueDims = HashBiMap.create(); final Map dimensionSchemaMap = new HashMap<>(); @@ -859,33 +870,35 @@ public class CompactionTask extends AbstractBatchIndexTask ); int index = 0; - for (NonnullPair pair : Lists.reverse(queryableIndices)) { + for (Pair pair : Lists.reverse(queryableIndices)) { final QueryableIndex queryableIndex = pair.lhs; - final Map dimensionHandlerMap = queryableIndex.getDimensionHandlers(); + if (queryableIndex != null) { // avoid tombstones + final Map dimensionHandlerMap = queryableIndex.getDimensionHandlers(); - for (String dimension : queryableIndex.getAvailableDimensions()) { - final ColumnHolder columnHolder = Preconditions.checkNotNull( - queryableIndex.getColumnHolder(dimension), - "Cannot find column for dimension[%s]", - dimension - ); - - if (!uniqueDims.containsKey(dimension)) { - final DimensionHandler dimensionHandler = Preconditions.checkNotNull( - dimensionHandlerMap.get(dimension), - "Cannot find dimensionHandler for dimension[%s]", + for (String dimension : queryableIndex.getAvailableDimensions()) { + final ColumnHolder columnHolder = Preconditions.checkNotNull( + queryableIndex.getColumnHolder(dimension), + "Cannot find column for dimension[%s]", dimension ); - uniqueDims.put(dimension, index++); - dimensionSchemaMap.put( - dimension, - createDimensionSchema( - dimension, - columnHolder.getCapabilities(), - dimensionHandler.getMultivalueHandling() - ) - ); + if (!uniqueDims.containsKey(dimension)) { + final DimensionHandler dimensionHandler = Preconditions.checkNotNull( + dimensionHandlerMap.get(dimension), + "Cannot find dimensionHandler for dimension[%s]", + dimension + ); + + uniqueDims.put(dimension, index++); + dimensionSchemaMap.put( + dimension, + createDimensionSchema( + dimension, + columnHolder.getCapabilities(), + dimensionHandler.getMultivalueHandling() + ) + ); + } } } } @@ -905,25 +918,33 @@ public class CompactionTask extends AbstractBatchIndexTask return new DimensionsSpec(dimensionSchemas); } - private static List> loadSegments( + /** + * This private method does not load, does not create QueryableIndices, for tombstones since tombstones + * do not have a file image, they are never pushed to deep storage. Thus, for the case of a tombstone, + * The return list + * will contain a null for the QueryableIndex slot in the pair (lhs) + */ + private static List> loadSegments( List> timelineObjectHolders, Map segmentFileMap, IndexIO indexIO ) throws IOException { - final List> segments = new ArrayList<>(); + final List> segments = new ArrayList<>(); for (TimelineObjectHolder timelineObjectHolder : timelineObjectHolders) { final PartitionHolder partitionHolder = timelineObjectHolder.getObject(); for (PartitionChunk chunk : partitionHolder) { + QueryableIndex queryableIndex = null; final DataSegment segment = chunk.getObject(); - final QueryableIndex queryableIndex = indexIO.loadIndex( - Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId()) - ); - segments.add(new NonnullPair<>(queryableIndex, segment)); + if (!chunk.getObject().isTombstone()) { + queryableIndex = indexIO.loadIndex( + Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId()) + ); + } + segments.add(new Pair<>(queryableIndex, segment)); } } - return segments; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 6f1a6d6446f..d10f3d3e9f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -237,7 +237,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI //noinspection ConstantConditions return FluentIterable .from(partitionHolder) - .filter(chunk -> !chunk.getObject().isTombstone()) .transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval())); }).iterator(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java index 63f2fe25c57..3396195a216 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java @@ -91,4 +91,10 @@ public class DruidSegmentInputEntity implements InputEntity } }; } + + public boolean isFromTombstone() + { + return segment.isTombstone(); + } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java index 4d028596ff0..732288e8541 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.input; +import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; @@ -55,14 +56,28 @@ public class DruidSegmentInputFormat implements InputFormat File temporaryDirectory ) { - return new DruidSegmentReader( - source, - indexIO, - inputRowSchema.getTimestampSpec(), - inputRowSchema.getDimensionsSpec(), - inputRowSchema.getColumnsFilter(), - dimFilter, - temporaryDirectory + // this method handles the case when the entity comes from a tombstone or from a regular segment + Preconditions.checkArgument( + source instanceof DruidSegmentInputEntity, + DruidSegmentInputEntity.class.getName() + " required, but " + + source.getClass().getName() + " provided." ); + + final InputEntityReader retVal; + // Cast is safe here because of the precondition above passed + if (((DruidSegmentInputEntity) source).isFromTombstone()) { + retVal = new DruidTombstoneSegmentReader(source); + } else { + retVal = new DruidSegmentReader( + source, + indexIO, + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + inputRowSchema.getColumnsFilter(), + dimFilter, + temporaryDirectory + ); + } + return retVal; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 7117eea0a93..87181b8cb49 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.input; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@ -72,7 +71,7 @@ import java.util.Set; public class DruidSegmentReader extends IntermediateRowParsingReader> { - private final DruidSegmentInputEntity source; + private DruidSegmentInputEntity source; private final IndexIO indexIO; private final ColumnsFilter columnsFilter; private final InputRowSchema inputRowSchema; @@ -89,7 +88,6 @@ public class DruidSegmentReader extends IntermediateRowParsingReader> intermediateRowIterator() throws IOException { - final CleanableFile segmentFile = source.fetch(temporaryDirectory, null); + final CleanableFile segmentFile = source().fetch(temporaryDirectory, null); final WindowedStorageAdapter storageAdapter = new WindowedStorageAdapter( new QueryableIndexStorageAdapter( indexIO.loadIndex(segmentFile.file()) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java new file mode 100644 index 00000000000..2a80112df85 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * This class will return an empty iterator since a tombstone has no data rows... + */ +public class DruidTombstoneSegmentReader extends IntermediateRowParsingReader> +{ + private DruidSegmentInputEntity source; + + public DruidTombstoneSegmentReader( + InputEntity source + ) + { + this.source = (DruidSegmentInputEntity) source; + if (!this.source.isFromTombstone()) { + throw new IAE("DruidSegmentInputEntity must be created from a tombstone but is not."); + } + } + + @Override + protected CloseableIterator> intermediateRowIterator() + { + return new CloseableIterator>() + { + @Override + public void close() + { + + } + + @Override + public boolean hasNext() + { + return false; + } + + @Override + public Map next() + { + throw new NoSuchElementException(); + } + }; + } + + @VisibleForTesting + @Override + protected List parseInputRows(Map intermediateRow) + { + throw new UnsupportedOperationException(getClass().getName()); + } + + @Override + protected List> toMap(Map intermediateRow) + { + throw new UnsupportedOperationException(getClass().getName()); + } + +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 1c3463624ec..bfd184e1375 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -82,6 +82,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLocalCacheManager; import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.segment.loading.TombstoneLoadSpec; import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter; import org.apache.druid.server.security.AuthTestUtils; @@ -1099,6 +1100,120 @@ public class CompactionTaskRunTest extends IngestionTestBase } + @Test + public void testCompactDatasourceOverIntervalWithOnlyTombstones() throws Exception + { + // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911. + if (lockGranularity == LockGranularity.SEGMENT) { + return; + } + + // The following task creates (several, more than three, last time I checked, six) HOUR segments with intervals of + // - 2014-01-01T00:00:00/2014-01-01T01:00:00 + // - 2014-01-01T01:00:00/2014-01-01T02:00:00 + // - 2014-01-01T02:00:00/2014-01-01T03:00:00 + // The six segments are: + // three rows in hour 00: + // 2014-01-01T00:00:00.000Z_2014-01-01T01:00:00.000Z with two rows + // 2014-01-01T00:00:00.000Z_2014-01-01T01:00:00.000Z_1 with one row + // three rows in hour 01: + // 2014-01-01T01:00:00.000Z_2014-01-01T02:00:00.000Z with two rows + // 2014-01-01T01:00:00.000Z_2014-01-01T02:00:00.000Z_1 with one row + // four rows in hour 02: + // 2014-01-01T02:00:00.000Z_2014-01-01T03:00:00.000Z with two rows + // 2014-01-01T02:00:00.000Z_2014-01-01T03:00:00.000Z_1 with two rows + // there are 10 rows total in data set + + // maxRowsPerSegment is set to 2 inside the runIndexTask methods + Pair> result = runIndexTask(); + Assert.assertEquals(6, result.rhs.size()); + + final Builder builder = new Builder( + DATA_SOURCE, + segmentCacheManagerFactory, + RETRY_POLICY_FACTORY + ); + + // Setup partial interval compaction: + // Change the granularity from HOUR to MINUTE through compaction for hour 01, there are three rows in the compaction + // interval, + // all three in the same timestamp (see TEST_ROWS), this should generate one segment in same, first, minute + // (task will now use + // the default rows per segments since compaction's tuning config is null) and + // 59 tombstones to completely overshadow the existing hour 01 segment. Since the segments outside the + // compaction interval should remanin unchanged there should be a total of 1 + (2 + 59) + 2 = 64 segments + + // **** PARTIAL COMPACTION: hour -> minute **** + final Interval compactionPartialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00"); + final CompactionTask partialCompactionTask = builder + .segmentGranularity(Granularities.MINUTE) + // Set dropExisting to true + .inputSpec(new CompactionIntervalSpec(compactionPartialInterval, null), true) + .build(); + final Pair> partialCompactionResult = runTask(partialCompactionTask); + Assert.assertTrue(partialCompactionResult.lhs.isSuccess()); + + // Segments that did not belong in the compaction interval (hours 00 and 02) are expected unchanged + // add 2 unchanged segments for hour 00: + final Set expectedSegments = new HashSet<>(); + expectedSegments.addAll( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01T00:00:00/2014-01-01T01:00:00")), + Segments.ONLY_VISIBLE + ) + ); + // add 2 unchanged segments for hour 02: + expectedSegments.addAll( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01T02:00:00/2014-01-01T03:00:00")), + Segments.ONLY_VISIBLE + ) + ); + expectedSegments.addAll(partialCompactionResult.rhs); + Assert.assertEquals(64, expectedSegments.size()); + + // New segments that were compacted are expected. However, old segments of the compacted interval should be + // overshadowed by the new tombstones (59) being created for all minutes other than 01:01 + final Set segmentsAfterPartialCompaction = new HashSet<>( + getStorageCoordinator().retrieveUsedSegmentsForIntervals( + DATA_SOURCE, + Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")), + Segments.ONLY_VISIBLE + ) + ); + Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction); + final List realSegmentsAfterPartialCompaction = + segmentsAfterPartialCompaction.stream() + .filter(s -> !s.isTombstone()) + .collect(Collectors.toList()); + final List tombstonesAfterPartialCompaction = + segmentsAfterPartialCompaction.stream() + .filter(s -> s.isTombstone()) + .collect(Collectors.toList()); + Assert.assertEquals(59, tombstonesAfterPartialCompaction.size()); + Assert.assertEquals(5, realSegmentsAfterPartialCompaction.size()); + Assert.assertEquals(64, segmentsAfterPartialCompaction.size()); + + // Now setup compaction over an interval with only tombstones, keeping same, minute granularity + final CompactionTask compactionTaskOverOnlyTombstones = builder + .segmentGranularity(null) + // Set dropExisting to true + // last 59 minutes of our 01, should be all tombstones + .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01T01:01:00/2014-01-01T02:00:00"), null), true) + .build(); + + // **** Compaction over tombstones **** + final Pair> resultOverOnlyTombstones = runTask(compactionTaskOverOnlyTombstones); + Assert.assertTrue(resultOverOnlyTombstones.lhs.isSuccess()); + + // compaction should not fail but since it is over the same granularity it should leave + // the tombstones unchanged + Assert.assertEquals(59, resultOverOnlyTombstones.rhs.size()); + resultOverOnlyTombstones.rhs.forEach(t -> Assert.assertTrue(t.isTombstone())); + } + @Test public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse() throws Exception { @@ -1454,6 +1569,7 @@ public class CompactionTaskRunTest extends IngestionTestBase final ObjectMapper objectMapper = getObjectMapper(); objectMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local")); objectMapper.registerSubtypes(LocalDataSegmentPuller.class); + objectMapper.registerSubtypes(TombstoneLoadSpec.class); final TaskToolbox box = createTaskToolbox(objectMapper, task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java index 29bbe5c97f6..876458d9176 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java @@ -179,4 +179,118 @@ public class DimensionCardinalityReportTest intervalToNumShards ); } + + + @Test + public void testLargeSupervisorDetermineNumShardsFromCardinalityReport() + { + List reports = new ArrayList<>(); + + HllSketch collector1 = DimensionCardinalityReport.createHllSketchForReport(); + collector1.update(IndexTask.HASH_FUNCTION.hashLong(1L).asBytes()); + collector1.update(IndexTask.HASH_FUNCTION.hashLong(200L).asBytes()); + DimensionCardinalityReport report1 = new DimensionCardinalityReport( + "taskA", + ImmutableMap.of( + Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"), + collector1.toCompactByteArray() + ) + ); + reports.add(report1); + + HllSketch collector2 = DimensionCardinalityReport.createHllSketchForReport(); + collector2.update(IndexTask.HASH_FUNCTION.hashLong(1000L).asBytes()); + collector2.update(IndexTask.HASH_FUNCTION.hashLong(30000L).asBytes()); + DimensionCardinalityReport report2 = new DimensionCardinalityReport( + "taskB", + ImmutableMap.of( + Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"), + collector2.toCompactByteArray() + ) + ); + reports.add(report2); + + // Separate interval with only 1 value + HllSketch collector3 = DimensionCardinalityReport.createHllSketchForReport(); + collector3.update(IndexTask.HASH_FUNCTION.hashLong(99000L).asBytes()); + DimensionCardinalityReport report3 = new DimensionCardinalityReport( + "taskC", + ImmutableMap.of( + Intervals.of("1970-01-02T00:00:00.000Z/1970-01-03T00:00:00.000Z"), + collector3.toCompactByteArray() + ) + ); + reports.add(report3); + + // first interval in test has cardinality 4 + Map intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + reports, + 1 + ); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 4, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); + + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + reports, + 2 + ); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 2, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); + + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + reports, + 3 + ); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 1, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); + + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + reports, + 4 + ); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 1, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); + + intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport( + reports, + 5 + ); + Assert.assertEquals( + ImmutableMap.of( + Intervals.of("1970-01-01/P1D"), + 1, + Intervals.of("1970-01-02/P1D"), + 1 + ), + intervalToNumShards + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java new file mode 100644 index 00000000000..910371a5669 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.input; + +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.FileEntity; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.Intervals; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertThrows; + + +public class DruidSegmentInputFormatTest +{ + + private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))), + ColumnsFilter.all() + ); + + + @Test + public void testDruidSegmentInputEntityReader() + { + DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null); + InputEntityReader reader = format.createReader( + INPUT_ROW_SCHEMA, + DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null), + null + ); + Assert.assertTrue(reader instanceof DruidSegmentReader); + } + + + @Test + public void testDruidTombstoneSegmentInputEntityReader() + { + DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null); + InputEntityReader reader = format.createReader( + INPUT_ROW_SCHEMA, + DruidSegmentReaderTest.makeTombstoneInputEntity(Intervals.of("2000/P1D")), + null + ); + Assert.assertTrue(reader instanceof DruidTombstoneSegmentReader); + } + + @Test + public void testDruidSegmentInputEntityReaderBadEntity() + { + DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { + format.createReader( + INPUT_ROW_SCHEMA, + new FileEntity(null), + null + ); + }); + String expectedMessage = + "org.apache.druid.indexing.input.DruidSegmentInputEntity required, but org.apache.druid.data.input.impl.FileEntity provided."; + String actualMessage = exception.getMessage(); + Assert.assertEquals(expectedMessage, actualMessage); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java index 9301b891e40..1638e79e9a7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FileEntity; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.hll.HyperLogLogCollector; @@ -52,6 +53,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.TombstoneShardSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -68,6 +70,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import static org.junit.Assert.assertThrows; + public class DruidSegmentReaderTest extends NullHandlingTest { @Rule @@ -186,6 +190,42 @@ public class DruidSegmentReaderTest extends NullHandlingTest ); } + @Test + public void testDruidTombstoneSegmentReader() throws IOException + { + final DruidTombstoneSegmentReader reader = new DruidTombstoneSegmentReader( + makeTombstoneInputEntity(Intervals.of("2000/P1D")) + ); + + Assert.assertFalse(reader.intermediateRowIterator().hasNext()); + Assert.assertEquals( + Collections.emptyList(), + readRows(reader) + ); + } + + @Test + public void testDruidTombstoneSegmentReaderBadEntity() + { + assertThrows(ClassCastException.class, () -> { + new DruidTombstoneSegmentReader( + new FileEntity(null)); + }); + } + + @Test + public void testDruidTombstoneSegmentReaderNotCreatedFromTombstone() + { + Exception exception = assertThrows(IllegalArgumentException.class, () -> { + new DruidTombstoneSegmentReader(makeInputEntity(Intervals.of("2000/P1D"))); + }); + String expectedMessage = + "DruidSegmentInputEntity must be created from a tombstone but is not."; + String actualMessage = exception.getMessage(); + Assert.assertEquals(expectedMessage, actualMessage); + + } + @Test public void testReaderAutoTimestampFormat() throws IOException { @@ -582,6 +622,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest } private DruidSegmentInputEntity makeInputEntity(final Interval interval) + { + return makeInputEntity(interval, segmentDirectory); + } + + public static DruidSegmentInputEntity makeInputEntity(final Interval interval, final File segmentDirectory) { return new DruidSegmentInputEntity( new SegmentCacheManager() @@ -634,7 +679,62 @@ public class DruidSegmentReaderTest extends NullHandlingTest ); } - private List readRows(final DruidSegmentReader reader) throws IOException + public static DruidSegmentInputEntity makeTombstoneInputEntity(final Interval interval) + { + return new DruidSegmentInputEntity( + new SegmentCacheManager() + { + @Override + public boolean isSegmentCached(DataSegment segment) + { + throw new UnsupportedOperationException("unused"); + } + + @Override + public File getSegmentFiles(DataSegment segment) + { + throw new UnsupportedOperationException("unused"); + } + + @Override + public void cleanup(DataSegment segment) + { + throw new UnsupportedOperationException("unused"); + } + + @Override + public boolean reserve(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec) + { + throw new UnsupportedOperationException(); + + } + }, + DataSegment.builder() + .dataSource("ds") + .interval(Intervals.of("2000/P1D")) + .version("1") + .shardSpec(new TombstoneShardSpec()) + .loadSpec(ImmutableMap.of("type", "tombstone")) + .size(1) + .build(), + interval + ); + } + + + private List readRows(DruidSegmentReader reader) throws IOException { final List rows = new ArrayList<>(); try (final CloseableIterator> iterator = reader.intermediateRowIterator()) { @@ -645,6 +745,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest return rows; } + private List readRows(DruidTombstoneSegmentReader reader) throws IOException + { + final List rows = new ArrayList<>(); + try (final CloseableIterator> iterator = reader.intermediateRowIterator()) { + while (iterator.hasNext()) { + rows.addAll(reader.parseInputRows(iterator.next())); + } + } + return rows; + } + + private static HyperLogLogCollector makeHLLC(final String... values) { final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 07a9bd62ded..7dd7591615c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -67,6 +68,7 @@ import org.apache.druid.tests.TestNGGroup; import org.apache.druid.tests.indexer.AbstractITBatchIndexTest; import org.apache.druid.tests.indexer.AbstractIndexerTest; import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; import org.joda.time.chrono.ISOChronology; @@ -80,6 +82,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -502,10 +505,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest 1, 1, 0); - Assert.assertEquals( - "14906", - compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize") - ); + Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14906"); // Run compaction again to compact the remaining day // Remaining day compacted (1 new segment). Now both days compacted (2 total) forceTriggerAutoCompaction(2); @@ -530,7 +530,28 @@ public class ITAutoCompactionTest extends AbstractIndexerTest @Test public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception { + // Interval is "2013-08-31/2013-09-02", segment gran is DAY, + // "maxRowsPerSegment": 3 + // input files: + // "/resources/data/batch_index/json/wikipedia_index_data1.json", + // 3rows -> "2013-08-31T01:02:33Z", "2013-08-31T03:32:45Z", "2013-08-31T07:11:21Z" + // "/resources/data/batch_index/json/wikipedia_index_data2.json", + // 3 rows -> "2013-08-31T11:58:39Z", "2013-08-31T12:41:27Z", "2013-09-01T01:02:33Z" + // "/resources/data/batch_index/json/wikipedia_index_data3.json" + // 4 rows -> "2013-09-01T03:32:45Z", "2013-09-01T07:11:21Z", "2013-09-01T11:58:39Z", "2013-09-01T12:41:27Z" + // Summary of data: + // 5 rows @ 2013-08031 and 5 at 2013-0901, TWO days have data only + // Initial load/ingestion: DAY, "intervals" : [ "2013-08-31/2013-09-02" ], Four segments, no tombstones + // 1st compaction: YEAR: 10 rows during 2013 (4 segments of at most three rows each) + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + // 2nd compaction: MONTH: 5 rows @ 2013-08 (two segments), 5 rows @ 2013-09 (two segments) + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + // Four data segments (two months) and 10 tombstones for remaining months + // 3d compaction: SEMESTER: 5 rows @ 2013-08-31 (two segments), 5 rows @ 2013-09-01 (two segments), + // 2 compactions were generated for year 2013; one for each semester to be compacted of the whole year. + // loadData(INDEX_TASK); + try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); @@ -538,12 +559,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); + + LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true"); Granularity newGranularity = Granularities.YEAR; // Set dropExisting to true + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); - LOG.info("Auto compaction test with YEAR segment granularity"); - List expectedIntervalAfterCompaction = new ArrayList<>(); for (String interval : intervalsBeforeCompaction) { for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { @@ -555,30 +577,169 @@ public class ITAutoCompactionTest extends AbstractIndexerTest verifySegmentsCompacted(1, 1000); checkCompactionIntervals(expectedIntervalAfterCompaction); - newGranularity = Granularities.DAY; + + LOG.info("Auto compaction test with MONTH segment granularity, dropExisting is true"); + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + newGranularity = Granularities.MONTH; // Set dropExisting to true submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); - LOG.info("Auto compaction test with DAY segment granularity"); - // Since dropExisting is set to true... + // Again data is only in two days // The earlier segment with YEAR granularity will be completely covered, overshadowed, by the - // new DAY segments for data and tombstones for days with no data - // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02 - // plus 363 tombstones - final List intervalsAfterYEARCompactionButBeforeDAYCompaction = + // new MONTH segments for data and tombstones for days with no data + // Hence, we will only have 2013-08 to 2013-09 months with data + // plus 12 tombstones + final List intervalsAfterYEARCompactionButBeforeMONTHCompaction = coordinator.getSegmentIntervals(fullDatasourceName); expectedIntervalAfterCompaction = new ArrayList<>(); - for (String interval : intervalsAfterYEARCompactionButBeforeDAYCompaction) { + for (String interval : intervalsAfterYEARCompactionButBeforeMONTHCompaction) { for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { expectedIntervalAfterCompaction.add(newinterval.toString()); } } - forceTriggerAutoCompaction(365); + forceTriggerAutoCompaction(12); verifyQuery(INDEX_QUERIES_RESOURCE); - verifyTombstones(363); - verifySegmentsCompacted(365, 1000); + verifyTombstones(10); + verifySegmentsCompacted(12, 1000); checkCompactionIntervals(expectedIntervalAfterCompaction); + + LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is true, over tombstones"); + // only reason is semester and not quarter or month is to minimize time in the test but to + // ensure that one of the compactions compacts *only* tombstones. The first semester will + // compact only tombstones, so it should be a tombstone itself. + newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC); + // Set dropExisting to true + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + + // Since dropExisting is set to true... + // The earlier 12 segments with MONTH granularity will be completely covered, overshadowed, by the + // new PT6M segments for data and tombstones for days with no data + // Hence, we will have two segments, one tombstone for the first semester and one data segment for the second. + forceTriggerAutoCompaction(2); // two semesters compacted + verifyQuery(INDEX_QUERIES_RESOURCE); + verifyTombstones(1); + verifySegmentsCompacted(2, 1000); + + expectedIntervalAfterCompaction = + Arrays.asList("2013-01-01T00:00:00.000Z/2013-07-01T00:00:00.000Z", + "2013-07-01T00:00:00.000Z/2014-01-01T00:00:00.000Z" + ); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + // verify that autocompaction completed before + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + forceTriggerAutoCompaction(2); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); + + } + } + + @Test + public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse() throws Exception + { + // Interval is "2013-08-31/2013-09-02", segment gran is DAY, + // "maxRowsPerSegment": 3 + // input files: + // "/resources/data/batch_index/json/wikipedia_index_data1.json", + // 3rows -> "2013-08-31T01:02:33Z", "2013-08-31T03:32:45Z", "2013-08-31T07:11:21Z" + // "/resources/data/batch_index/json/wikipedia_index_data2.json", + // 3 rows -> "2013-08-31T11:58:39Z", "2013-08-31T12:41:27Z", "2013-09-01T01:02:33Z" + // "/resources/data/batch_index/json/wikipedia_index_data3.json" + // 4 rows -> "2013-09-01T03:32:45Z", "2013-09-01T07:11:21Z", "2013-09-01T11:58:39Z", "2013-09-01T12:41:27Z" + // Summary of data: + // 5 rows @ 2013-08031 and 5 at 2013-0901, TWO days have data only + // Initial load/ingestion: DAY, "intervals" : [ "2013-08-31/2013-09-02" ], Four segments, no tombstones + // 1st compaction: YEAR: 10 rows during 2013 (4 segments of at most three rows each) + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + // 2nd compaction: MONTH: 5 rows @ 2013-08 (two segments), 5 rows @ 2013-09 (two segments) + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + // Four data segments (two months) and 10 tombstones for remaining months + // 3d compaction: SEMESTER: 5 rows @ 2013-08-31, 5 rows @ 2013-09-01 (two segment), + // 2 compactions were generated for year 2013; one for each semester to be compacted of the whole year. + // + loadData(INDEX_TASK); + + try (final Closeable ignored = unloader(fullDatasourceName)) { + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + // 4 segments across 2 days (4 total)... + verifySegmentsCount(4); + verifyQuery(INDEX_QUERIES_RESOURCE); + + + LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true"); + Granularity newGranularity = Granularities.YEAR; + // Set dropExisting to true + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + + List expectedIntervalAfterCompaction = new ArrayList<>(); + for (String interval : intervalsBeforeCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(1); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifySegmentsCompacted(1, 1000); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + + LOG.info("Auto compaction test with MONTH segment granularity, dropExisting is true"); + // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z", + newGranularity = Granularities.MONTH; + // Set dropExisting to true + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true); + + // Since dropExisting is set to true... + // Again data is only in two days + // The earlier segment with YEAR granularity will be completely covered, overshadowed, by the + // new MONTH segments for data and tombstones for days with no data + // Hence, we will only have 2013-08 to 2013-09 months with data + // plus 12 tombstones + final List intervalsAfterYEARCompactionButBeforeMONTHCompaction = + coordinator.getSegmentIntervals(fullDatasourceName); + expectedIntervalAfterCompaction = new ArrayList<>(); + for (String interval : intervalsAfterYEARCompactionButBeforeMONTHCompaction) { + for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) { + expectedIntervalAfterCompaction.add(newinterval.toString()); + } + } + forceTriggerAutoCompaction(12); + verifyQuery(INDEX_QUERIES_RESOURCE); + verifyTombstones(10); + verifySegmentsCompacted(12, 1000); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + // Now compact again over tombstones but with dropExisting set to false: + LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is false, over tombstones"); + newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC); + // Set dropExisting to false + submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, + null, null + ), false); + + // Since dropExisting is set to false the first semester will be forced to dropExisting true + // Hence, we will have two, one tombstone for the first semester and one data segment for the second. + forceTriggerAutoCompaction(2); // two semesters compacted + verifyQuery(INDEX_QUERIES_RESOURCE); + verifyTombstones(1); + verifySegmentsCompacted(2, 1000); + + expectedIntervalAfterCompaction = + Arrays.asList( + "2013-01-01T00:00:00.000Z/2013-07-01T00:00:00.000Z", + "2013-07-01T00:00:00.000Z/2014-01-01T00:00:00.000Z" + ); + checkCompactionIntervals(expectedIntervalAfterCompaction); + + // verify that autocompaction completed before + List compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName); + forceTriggerAutoCompaction(2); + List compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName); + Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index e3f47ddf247..ddae02298bd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -427,6 +427,26 @@ public class CompactSegments implements CoordinatorCustomDuty dropExisting = config.getIoConfig().isDropExisting(); } + // If all the segments found to be compacted are tombstones then dropExisting + // needs to be forced to true. This forcing needs to happen in the case that + // the flag is null, or it is false. It is needed when it is null to avoid the + // possibility of the code deciding to default it to false later. + // Forcing the flag to true will enable the task ingestion code to generate new, compacted, tombstones to + // cover the tombstones found to be compacted as well as to mark them + // as compacted (update their lastCompactionState). If we don't force the + // flag then every time this compact duty runs it will find the same tombstones + // in the interval since their lastCompactionState + // was not set repeating this over and over and the duty will not make progress; it + // will become stuck on this set of tombstones. + // This forcing code should be revised + // when/if the autocompaction code policy to decide which segments to compact changes + if (dropExisting == null || !dropExisting) { + if (segmentsToCompact.stream().allMatch(dataSegment -> dataSegment.isTombstone())) { + dropExisting = true; + LOG.info("Forcing dropExisting to %s since all segments to compact are tombstones", dropExisting); + } + } + // make tuningConfig final String taskId = indexingServiceClient.compactSegments( "coordinator-issued",