diff --git a/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java b/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java index 9c2ec8b4a8c..837c8770290 100644 --- a/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineBenchmark.java @@ -67,7 +67,7 @@ public class VersionedIntervalTimelineBenchmark @Param({"10", "100", "1000"}) private int numInitialRootGenSegmentsPerInterval; - @Param({"1", "5"}) + @Param({"1", "2"}) private int numNonRootGenerations; @Param({"false", "true"}) diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index ee2a9f36f68..783bf8bd2d3 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -22,12 +22,12 @@ package org.apache.druid.timeline; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterators; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.timeline.partition.ImmutablePartitionHolder; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.utils.CollectionUtils; @@ -158,14 +158,17 @@ public class VersionedIntervalTimeline findNonOvershadowedObjectsInInterval(Interval interval, Partitions completeness) { - return lookup(interval, completeness) - .stream() - .flatMap(timelineObjectHolder -> timelineObjectHolder.getObject().stream()) - .map(PartitionChunk::getObject) - .collect(Collectors.toSet()); + return FluentIterable + .from(lookup(interval, completeness)) + .transformAndConcat(TimelineObjectHolder::getObject) + .transform(PartitionChunk::getObject) + .toSet(); } public void add(final Interval interval, VersionType version, PartitionChunk object) @@ -278,7 +281,7 @@ public class VersionedIntervalTimeline(foundEntry.getPartitionHolder()); + return foundEntry.getPartitionHolder().asImmutable(); } } } @@ -362,7 +365,7 @@ public class VersionedIntervalTimeline(entry.getPartitionHolder()) + PartitionHolder.copyWithOnlyVisibleChunks(entry.getPartitionHolder()) ); } @@ -381,9 +384,13 @@ public class VersionedIntervalTimeline> overshadowedObjects = overshadowedPartitionsTimeline .values() .stream() - .flatMap( - (Map entry) -> entry.values().stream().map(this::timelineEntryToObjectHolder) - ) + .flatMap((Map entry) -> entry.values().stream()) + .map(entry -> new TimelineObjectHolder<>( + entry.getTrueInterval(), + entry.getTrueInterval(), + entry.getVersion(), + PartitionHolder.deepCopy(entry.getPartitionHolder()) + )) .collect(Collectors.toSet()); // 2. Visible timelineEntries can also have overshadowed objects. Add them to the result too. @@ -482,7 +489,12 @@ public class VersionedIntervalTimeline 0) { return false; } else if (versionCompare == 0) { - if (timelineEntry.partitionHolder.stream().noneMatch(chunk -> chunk.getObject().overshadows(object))) { + // Intentionally use the Iterators API instead of the stream API for performance. + //noinspection ConstantConditions + final boolean nonOvershadowedObject = Iterators.all( + timelineEntry.partitionHolder.iterator(), chunk -> !chunk.getObject().overshadows(object) + ); + if (nonOvershadowedObject) { return false; } } @@ -724,7 +736,7 @@ public class VersionedIntervalTimeline(val.getPartitionHolder()) + PartitionHolder.copyWithOnlyVisibleChunks(val.getPartitionHolder()) ) ); } diff --git a/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java b/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java index 1386c14c709..e9090aedf26 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/AtomicUpdateGroup.java @@ -45,11 +45,21 @@ class AtomicUpdateGroup> implements Overshadowable> chunks = new ArrayList<>(); + static > AtomicUpdateGroup copy(AtomicUpdateGroup group) + { + return new AtomicUpdateGroup<>(group.chunks); + } + AtomicUpdateGroup(PartitionChunk chunk) { this.chunks.add(chunk); } + private AtomicUpdateGroup(List> chunks) + { + this.chunks.addAll(chunks); + } + public void add(PartitionChunk chunk) { if (isFull()) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java index 65c1b0f97ba..9a3bf115cbb 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/ImmutablePartitionHolder.java @@ -25,9 +25,9 @@ import org.apache.druid.timeline.Overshadowable; */ public class ImmutablePartitionHolder> extends PartitionHolder { - public ImmutablePartitionHolder(PartitionHolder partitionHolder) + protected ImmutablePartitionHolder(OvershadowableManager overshadowableManager) { - super(partitionHolder); + super(overshadowableManager); } @Override diff --git a/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java b/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java index fcc77ddcd85..a4529f31cef 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java @@ -21,7 +21,8 @@ package org.apache.druid.timeline.partition; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterators; import it.unimi.dsi.fastutil.objects.AbstractObjectCollection; import it.unimi.dsi.fastutil.objects.ObjectCollection; import it.unimi.dsi.fastutil.objects.ObjectIterator; @@ -44,7 +45,9 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -52,9 +55,6 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.function.BiPredicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details. @@ -105,22 +105,46 @@ class OvershadowableManager> this.overshadowedGroups = new TreeMap<>(); } - OvershadowableManager(OvershadowableManager other) + public static > OvershadowableManager copyVisible(OvershadowableManager original) { - this.knownPartitionChunks = new HashMap<>(other.knownPartitionChunks); - this.standbyGroups = new TreeMap<>(other.standbyGroups); - this.visibleGroupPerRange = new TreeMap<>(other.visibleGroupPerRange); - this.overshadowedGroups = new TreeMap<>(other.overshadowedGroups); + final OvershadowableManager copy = new OvershadowableManager<>(); + original.visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> { + // There should be only one group per partition range + final AtomicUpdateGroup group = versionToGroups.values().iterator().next(); + group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk)); + + copy.visibleGroupPerRange.put( + partitionRange, + new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group)) + ); + }); + return copy; } - private OvershadowableManager(List> groups) + public static > OvershadowableManager deepCopy(OvershadowableManager original) { - this(); - for (AtomicUpdateGroup entry : groups) { - for (PartitionChunk chunk : entry.getChunks()) { - addChunk(chunk); - } - } + final OvershadowableManager copy = copyVisible(original); + original.overshadowedGroups.forEach((partitionRange, versionToGroups) -> { + // There should be only one group per partition range + final AtomicUpdateGroup group = versionToGroups.values().iterator().next(); + group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk)); + + copy.overshadowedGroups.put( + partitionRange, + new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group)) + ); + }); + original.standbyGroups.forEach((partitionRange, versionToGroups) -> { + // There should be only one group per partition range + final AtomicUpdateGroup group = versionToGroups.values().iterator().next(); + group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk)); + + copy.standbyGroups.put( + partitionRange, + new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group)) + ); + }); + return copy; } private TreeMap>> getStateMap(State state) @@ -168,7 +192,7 @@ class OvershadowableManager> private void replaceVisibleWith( Collection> oldVisibleGroups, State newStateOfOldVisibleGroup, - List> newVisibleGroups, + Collection> newVisibleGroups, State oldStateOfNewVisibleGroups ) { @@ -263,27 +287,17 @@ class OvershadowableManager> * @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions * than this. * @param fromState the state to search for overshadowed groups. - * * @return a list of found atomicUpdateGroups. It could be empty if no groups are found. */ @VisibleForTesting List> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState) { final TreeMap>> stateMap = getStateMap(fromState); - Entry>> current = findLowestOverlappingEntry( - rangeOfAug, - stateMap, - true - ); - - if (current == null) { - return Collections.emptyList(); - } - - // Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug. - // Note that RootPartitionRange of entries are always consecutive. final List> found = new ArrayList<>(); - while (current != null && rangeOfAug.overlaps(current.getKey())) { + final Iterator>>> iterator = + entryIteratorGreaterThan(rangeOfAug.startPartitionId, stateMap); + while (iterator.hasNext()) { + final Entry>> current = iterator.next(); if (rangeOfAug.contains(current.getKey())) { // versionToGroup is sorted by minorVersion. // versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions @@ -296,8 +310,9 @@ class OvershadowableManager> if (versionToGroup.firstShortKey() < minorVersion) { found.addAll(versionToGroup.headMap(minorVersion).values()); } + } else { + break; } - current = stateMap.higherEntry(current.getKey()); } return found; } @@ -318,27 +333,23 @@ class OvershadowableManager> * @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor * versions than this. * @param fromState the state to search for overshadowed groups. - * * @return a list of found atomicUpdateGroups. It could be empty if no groups are found. */ @VisibleForTesting List> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState) { final TreeMap>> stateMap = getStateMap(fromState); - Entry>> current = findLowestOverlappingEntry( - rangeOfAug, - stateMap, - false - ); - - if (current == null) { - return Collections.emptyList(); - } + final Iterator>>> iterator = + entryIteratorSmallerThan(rangeOfAug.endPartitionId, stateMap); // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug. // Note that RootPartitionRange of entries are always consecutive. final List> found = new ArrayList<>(); - while (current != null && current.getKey().overlaps(rangeOfAug)) { + while (iterator.hasNext()) { + final Entry>> current = iterator.next(); + if (!current.getKey().overlaps(rangeOfAug)) { + break; + } if (current.getKey().contains(rangeOfAug)) { // versionToGroup is sorted by minorVersion. // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher @@ -352,63 +363,62 @@ class OvershadowableManager> found.addAll(versionToGroup.tailMap(minorVersion).values()); } } - current = stateMap.higherEntry(current.getKey()); } return found; } - /** - * Finds the lowest entry overlapping with the given root partition range. - * It first searches the entries lower than or equal to the given range. - * If there's no such entry lower than the given range, then it searches the entries higher than the given range. - * - * @return an entry of the lowest key overlapping with the given range. Otherwise null. - */ - @Nullable - private Entry>> findLowestOverlappingEntry( - RootPartitionRange rangeOfAug, - TreeMap>> stateMap, - boolean strictSameStartId - ) + boolean isOvershadowedByVisibleGroup(RootPartitionRange partitionRange, short minorVersion) { - // Searches the entries lower than or equal to the given range. - Entry>> current = stateMap.floorEntry(rangeOfAug); + final Iterator>>> iterator = + entryIteratorSmallerThan(partitionRange.endPartitionId, visibleGroupPerRange); - if (current == null) { - // Searches the entries higher than then given range. - current = stateMap.higherEntry(rangeOfAug); - } - - if (current == null) { - return null; - } - - // floorEntry() can return the greatest key less than rangeOfAug. We need to skip non-overlapping keys. - while (current != null && !current.getKey().overlaps(rangeOfAug)) { - current = stateMap.higherEntry(current.getKey()); - } - - final BiPredicate predicate; - if (strictSameStartId) { - predicate = (entryRange, groupRange) -> entryRange.startPartitionId == groupRange.startPartitionId; - } else { - predicate = RootPartitionRange::overlaps; - } - - // There could be multiple entries of the same startPartitionId but different endPartitionId. - // Find the first key of the same startPartitionId which has the lowest endPartitionId. - while (current != null) { - final Entry>> lowerEntry = stateMap.lowerEntry( - current.getKey() - ); - if (lowerEntry != null && predicate.test(lowerEntry.getKey(), rangeOfAug)) { - current = lowerEntry; - } else { + // Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug. + // Note that RootPartitionRange of entries are always consecutive. + while (iterator.hasNext()) { + final Entry>> current = iterator.next(); + if (!current.getKey().overlaps(partitionRange)) { break; } + if (current.getKey().contains(partitionRange)) { + // versionToGroup is sorted by minorVersion. + // versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher + // minorVersions than the given minorVersion. + final Short2ObjectSortedMap> versionToGroup = current.getValue(); + if (versionToGroup.lastShortKey() > minorVersion) { + return true; + } + } } + return false; + } - return current; + /** + * Returns an iterator of entries that has a {@link RootPartitionRange} smaller than the given partitionId. + * A RootPartitionRange is smaller than a partitionId if {@link RootPartitionRange#startPartitionId} < partitionId. + */ + private Iterator>>> entryIteratorSmallerThan( + short partitionId, + TreeMap>> stateMap + ) + { + final RootPartitionRange lowFench = new RootPartitionRange((short) 0, (short) 0); + final RootPartitionRange highFence = new RootPartitionRange(partitionId, partitionId); + return stateMap.subMap(lowFench, false, highFence, false).descendingMap().entrySet().iterator(); + } + + /** + * Returns an iterator of entries that has a {@link RootPartitionRange} greater than the given partitionId. + * A RootPartitionRange is greater than a partitionId if {@link RootPartitionRange#startPartitionId} >= partitionId + * and {@link RootPartitionRange#endPartitionId} > partitionId. + */ + private Iterator>>> entryIteratorGreaterThan( + short partitionId, + TreeMap>> stateMap + ) + { + final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId); + final RootPartitionRange highFence = new RootPartitionRange(Short.MAX_VALUE, Short.MAX_VALUE); + return stateMap.subMap(lowFench, false, highFence, false).entrySet().iterator(); } /** @@ -539,13 +549,14 @@ class OvershadowableManager> if (!isOvershadowingGroupsFull) { // Let's check the overshadowed groups can cover the partition range of groupsOvershadowingAug // and are fully available. - final List> latestFullGroups = groupsOvershadowingAug - .stream() - .flatMap(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups( + //noinspection ConstantConditions + final List> latestFullGroups = FluentIterable + .from(groupsOvershadowingAug) + .transformAndConcat(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups( RootPartitionRange.of(eachFullgroup), - eachFullgroup.getMinorVersion()).stream() - ) - .collect(Collectors.toList()); + eachFullgroup.getMinorVersion() + )) + .toList(); if (!latestFullGroups.isEmpty()) { final boolean isOvershadowedGroupsFull = doGroupsFullyCoverPartitionRange( @@ -569,12 +580,11 @@ class OvershadowableManager> * - All groups must be full. * - All groups must be adjacent. * - The lowest startPartitionId and the highest endPartitionId must be same with the given startPartitionId and - * the given endPartitionId, respectively. + * the given endPartitionId, respectively. * * @param groups atomicUpdateGroups sorted by their rootPartitionRange * @param startRootPartitionId the start partitionId of the root partition range to check the coverage * @param endRootPartitionId the end partitionId of the root partition range to check the coverage - * * @return true if the given groups fully cover the given partition range. */ private boolean doGroupsFullyCoverPartitionRange( @@ -675,11 +685,10 @@ class OvershadowableManager> final AtomicUpdateGroup newAtomicUpdateGroup = new AtomicUpdateGroup<>(chunk); // Decide the initial state of the new atomicUpdateGroup - final boolean overshadowed = visibleGroupPerRange - .values() - .stream() - .flatMap(map -> map.values().stream()) - .anyMatch(group -> group.overshadows(newAtomicUpdateGroup)); + final boolean overshadowed = isOvershadowedByVisibleGroup( + RootPartitionRange.of(newAtomicUpdateGroup), + newAtomicUpdateGroup.getMinorVersion() + ); if (overshadowed) { addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED, true); @@ -724,38 +733,35 @@ class OvershadowableManager> if (!latestFullAugs.isEmpty()) { // The current visible atomicUpdateGroup becomes standby // and the fully available overshadowed atomicUpdateGroups become visible - final Set> overshadowsLatestFullAugsInVisible = latestFullAugs - .stream() - .flatMap(group -> findOvershadows(group, State.VISIBLE).stream()) - .collect(Collectors.toSet()); + final Set> overshadowsLatestFullAugsInVisible = FluentIterable + .from(latestFullAugs) + .transformAndConcat(group -> findOvershadows(group, State.VISIBLE)) + .toSet(); replaceVisibleWith( overshadowsLatestFullAugsInVisible, State.STANDBY, latestFullAugs, State.OVERSHADOWED ); - latestFullAugs - .stream() - .flatMap(group -> findOvershadows(group, State.OVERSHADOWED).stream()) - .collect(Collectors.toSet()) - .forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY)); + FluentIterable.from(latestFullAugs) + .transformAndConcat(group -> findOvershadows(group, State.OVERSHADOWED)) + .forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY)); } else { // Find the latest non-fully available atomicUpdateGroups final List> latestStandby = findLatestNonFullyAvailableAtomicUpdateGroups( findOvershadows(rangeOfAug, minorVersion, State.STANDBY) ); if (!latestStandby.isEmpty()) { - final List> overshadowedByLatestStandby = latestStandby - .stream() - .flatMap(group -> findOvershadowedBy(group, State.VISIBLE).stream()) - .collect(Collectors.toList()); + final List> overshadowedByLatestStandby = FluentIterable + .from(latestStandby) + .transformAndConcat(group -> findOvershadowedBy(group, State.VISIBLE)) + .toList(); replaceVisibleWith(overshadowedByLatestStandby, State.OVERSHADOWED, latestStandby, State.STANDBY); // All standby groups overshadowed by the new visible group should be moved to overshadowed - latestStandby - .stream() - .flatMap(group -> findOvershadowedBy(group, State.STANDBY).stream()) - .collect(Collectors.toSet()) + FluentIterable + .from(latestStandby) + .transformAndConcat(group -> findOvershadowedBy(group, State.STANDBY)) .forEach(aug -> transitAtomicUpdateGroupState(aug, State.STANDBY, State.OVERSHADOWED)); } else if (augOfRemovedChunk.isEmpty()) { // Visible is empty. Move the latest overshadowed to visible. @@ -785,15 +791,27 @@ class OvershadowableManager> return Collections.emptyList(); } - final OvershadowableManager manager = new OvershadowableManager<>(groups); - if (!manager.standbyGroups.isEmpty()) { - throw new ISE("This method should be called only when there is no fully available group in the given state."); + final TreeMap> rangeToGroup = new TreeMap<>(); + for (AtomicUpdateGroup group : groups) { + rangeToGroup.put(RootPartitionRange.of(group), group); } - final List> visibles = new ArrayList<>(); - for (Short2ObjectSortedMap> map : manager.visibleGroupPerRange.values()) { - visibles.addAll(map.values()); + // rangeToGroup is sorted by RootPartitionRange which means, the groups of the wider range will appear later + // in the rangeToGroup map. Since the wider groups have higer minor versions than narrower groups, + // we iterate the rangeToGroup from the last entry in descending order. + Entry> currEntry = rangeToGroup.lastEntry(); + while (currEntry != null) { + final Entry> lowerEntry = rangeToGroup.lowerEntry(currEntry.getKey()); + if (lowerEntry != null) { + if (lowerEntry.getKey().endPartitionId != currEntry.getKey().startPartitionId) { + return Collections.emptyList(); + } + } + visibles.add(currEntry.getValue()); + currEntry = lowerEntry; } + // visibles should be sorted. + visibles.sort(Comparator.comparing(RootPartitionRange::of)); return visibles; } @@ -807,27 +825,38 @@ class OvershadowableManager> minorVersion, State.OVERSHADOWED ); - if (overshadowedGroups.isEmpty()) { + + // Filter out non-fully available groups. + final TreeMap> fullGroups = new TreeMap<>(); + for (AtomicUpdateGroup group : FluentIterable.from(overshadowedGroups).filter(AtomicUpdateGroup::isFull)) { + fullGroups.put(RootPartitionRange.of(group), group); + } + if (fullGroups.isEmpty()) { + return Collections.emptyList(); + } + if (fullGroups.firstKey().startPartitionId != rangeOfAug.startPartitionId + || fullGroups.lastKey().endPartitionId != rangeOfAug.endPartitionId) { return Collections.emptyList(); } - final OvershadowableManager manager = new OvershadowableManager<>(overshadowedGroups); + // Find latest fully available groups. final List> visibles = new ArrayList<>(); - for (Short2ObjectSortedMap> map : manager.visibleGroupPerRange.values()) { - for (AtomicUpdateGroup atomicUpdateGroup : map.values()) { - if (!atomicUpdateGroup.isFull()) { + // fullGroups is sorted by RootPartitionRange which means, the groups of the wider range will appear later + // in the fullGroups map. Since the wider groups have higer minor versions than narrower groups, + // we iterate the fullGroups from the last entry in descending order. + Entry> currEntry = fullGroups.lastEntry(); + while (currEntry != null) { + final Entry> lowerEntry = fullGroups.lowerEntry(currEntry.getKey()); + if (lowerEntry != null) { + if (lowerEntry.getKey().endPartitionId != currEntry.getKey().startPartitionId) { return Collections.emptyList(); } - visibles.add(atomicUpdateGroup); } + visibles.add(currEntry.getValue()); + currEntry = lowerEntry; } - final RootPartitionRange foundRange = RootPartitionRange.of( - visibles.get(0).getStartRootPartitionId(), - visibles.get(visibles.size() - 1).getEndRootPartitionId() - ); - if (!rangeOfAug.equals(foundRange)) { - return Collections.emptyList(); - } + // visibles should be sorted. + visibles.sort(Comparator.comparing(RootPartitionRange::of)); return visibles; } @@ -896,10 +925,14 @@ class OvershadowableManager> public boolean isComplete() { - return visibleGroupPerRange - .values() - .stream() - .allMatch(map -> Iterables.getOnlyElement(map.values()).isFull()); + return Iterators.all( + visibleGroupPerRange.values().iterator(), + map -> { + SingleEntryShort2ObjectSortedMap> singleMap = + (SingleEntryShort2ObjectSortedMap>) map; + //noinspection ConstantConditions + return singleMap.val.isFull(); + }); } @Nullable @@ -922,13 +955,18 @@ class OvershadowableManager> } } - Stream> createVisibleChunksStream() + Iterator> visibleChunksIterator() { - return visibleGroupPerRange - .values() - .stream() - .flatMap((Short2ObjectSortedMap> map) -> map.values().stream()) - .flatMap((AtomicUpdateGroup aug) -> aug.getChunks().stream()); + final FluentIterable>> versionToGroupIterable = FluentIterable.from( + visibleGroupPerRange.values() + ); + return versionToGroupIterable + .transformAndConcat(map -> { + SingleEntryShort2ObjectSortedMap> singleMap = + (SingleEntryShort2ObjectSortedMap>) map; + //noinspection ConstantConditions + return singleMap.val.getChunks(); + }).iterator(); } List> getOvershadowedChunks() @@ -1025,9 +1063,9 @@ class OvershadowableManager> public boolean overlaps(RootPartitionRange that) { return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId) - && Short.toUnsignedInt(endPartitionId) > Short.toUnsignedInt(that.startPartitionId) - || Short.toUnsignedInt(startPartitionId) >= Short.toUnsignedInt(that.startPartitionId) - && Short.toUnsignedInt(startPartitionId) < Short.toUnsignedInt(that.endPartitionId); + && Short.toUnsignedInt(endPartitionId) > Short.toUnsignedInt(that.startPartitionId) + || Short.toUnsignedInt(startPartitionId) >= Short.toUnsignedInt(that.startPartitionId) + && Short.toUnsignedInt(startPartitionId) < Short.toUnsignedInt(that.endPartitionId); } @Override @@ -1086,6 +1124,12 @@ class OvershadowableManager> val = null; } + private SingleEntryShort2ObjectSortedMap(short key, V val) + { + this.key = key; + this.val = val; + } + @Override public Short2ObjectSortedMap subMap(short fromKey, short toKey) { diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java index c39c4144693..289ac934825 100644 --- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java +++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java @@ -26,8 +26,6 @@ import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Spliterator; -import java.util.stream.Stream; /** * An object that clumps together multiple other objects which each represent a shard of some space. @@ -36,6 +34,18 @@ public class PartitionHolder> implements Iterable overshadowableManager; + public static > PartitionHolder copyWithOnlyVisibleChunks( + PartitionHolder partitionHolder + ) + { + return new PartitionHolder<>(OvershadowableManager.copyVisible(partitionHolder.overshadowableManager)); + } + + public static > PartitionHolder deepCopy(PartitionHolder partitionHolder) + { + return new PartitionHolder<>(OvershadowableManager.deepCopy(partitionHolder.overshadowableManager)); + } + public PartitionHolder(PartitionChunk initialChunk) { this.overshadowableManager = new OvershadowableManager<>(); @@ -50,9 +60,14 @@ public class PartitionHolder> implements Iterable partitionHolder) + protected PartitionHolder(OvershadowableManager overshadowableManager) { - this.overshadowableManager = new OvershadowableManager<>(partitionHolder.overshadowableManager); + this.overshadowableManager = overshadowableManager; + } + + public ImmutablePartitionHolder asImmutable() + { + return new ImmutablePartitionHolder<>(OvershadowableManager.copyVisible(overshadowableManager)); } public boolean add(PartitionChunk chunk) @@ -112,18 +127,7 @@ public class PartitionHolder> implements Iterable> iterator() { - return stream().iterator(); - } - - @Override - public Spliterator> spliterator() - { - return stream().spliterator(); - } - - public Stream> stream() - { - return overshadowableManager.createVisibleChunksStream(); + return overshadowableManager.visibleChunksIterator(); } public List> getOvershadowed() diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java index 3433e128fc6..0bb2354177b 100644 --- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java +++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineSpecificDataTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.timeline.partition.ImmutablePartitionHolder; import org.apache.druid.timeline.partition.IntegerPartitionChunk; import org.apache.druid.timeline.partition.OvershadowableInteger; import org.apache.druid.timeline.partition.PartitionHolder; @@ -225,22 +224,22 @@ public class VersionedIntervalTimelineSpecificDataTest extends VersionedInterval public void testFindEntry() { Assert.assertEquals( - new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))), + new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1") ); Assert.assertEquals( - new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))), + new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1") ); Assert.assertEquals( - new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))), + new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1") ); Assert.assertEquals( - new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))), + new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1") ); diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java index c55e11369b9..57abe7a8108 100644 --- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java +++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.timeline.partition.ImmutablePartitionHolder; import org.apache.druid.timeline.partition.IntegerPartitionChunk; import org.apache.druid.timeline.partition.OvershadowableInteger; import org.apache.druid.timeline.partition.PartitionHolder; @@ -58,7 +57,7 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest add("2011-01-02/2011-01-05", "2", 1); Assert.assertEquals( - new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))), + new PartitionHolder<>(makeSingle("1", 1)).asImmutable(), timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1") ); } @@ -1407,9 +1406,7 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest new PartitionHolder<>( ImmutableList.of( makeNumbered("1", 0, 0), - makeNumbered("1", 1, 0), - makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3), - makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3) + makeNumbered("1", 1, 0) ) ) ) @@ -1500,4 +1497,79 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest Assert.assertEquals(2, Lists.newArrayList(overshadowableIntegers.iterator()).size()); } + @Test + public void testFindNonOvershadowedObjectsInIntervalWithOnlyCompletePartitionsReturningValidResult() + { + // 2019-01-01/2019-01-02 + add("2019-01-01/2019-01-02", "0", makeNumbered("0", 0, 0)); + add("2019-01-01/2019-01-02", "0", makeNumbered("0", 1, 0)); + add("2019-01-01/2019-01-02", "0", makeNumbered("0", 2, 0)); + + // 2019-01-02/2019-01-03 + add("2019-01-02/2019-01-03", "0", makeNumbered("0", 0, 0)); + add("2019-01-02/2019-01-03", "0", makeNumbered("0", 1, 0)); + + // Incomplete partitions + add("2019-01-03/2019-01-04", "0", makeNumbered("2", 0, 3, 0)); + add("2019-01-03/2019-01-04", "0", makeNumbered("2", 1, 3, 0)); + + // Overwrite 2019-01-01/2019-01-02 + add("2019-01-01/2019-01-02", "1", makeNumbered("1", 0, 0)); + add("2019-01-01/2019-01-02", "1", makeNumbered("1", 1, 0)); + + // Overwrite 2019-01-01/2019-01-02 + add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3)); + add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3)); + add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3)); + + Assert.assertEquals( + ImmutableSet.of( + makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3).getObject(), + makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3).getObject(), + makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3).getObject(), + makeNumbered("0", 0, 0).getObject(), + makeNumbered("0", 1, 0).getObject() + ), + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2019-01-01/2019-01-04"), Partitions.ONLY_COMPLETE) + ); + } + + @Test + public void testFindNonOvershadowedObjectsInIntervalWithIncompleteOkReturningValidResult() + { + // 2019-01-01/2019-01-02 + add("2019-01-01/2019-01-02", "0", makeNumbered("0", 0, 0)); + add("2019-01-01/2019-01-02", "0", makeNumbered("0", 1, 0)); + add("2019-01-01/2019-01-02", "0", makeNumbered("0", 2, 0)); + + // 2019-01-02/2019-01-03 + add("2019-01-02/2019-01-03", "0", makeNumbered("0", 0, 0)); + add("2019-01-02/2019-01-03", "0", makeNumbered("0", 1, 0)); + + // Incomplete partitions + add("2019-01-03/2019-01-04", "0", makeNumbered("2", 0, 3, 0)); + add("2019-01-03/2019-01-04", "0", makeNumbered("2", 1, 3, 0)); + + // Overwrite 2019-01-01/2019-01-02 + add("2019-01-01/2019-01-02", "1", makeNumbered("1", 0, 0)); + add("2019-01-01/2019-01-02", "1", makeNumbered("1", 1, 0)); + + // Overwrite 2019-01-01/2019-01-02 + add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3)); + add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3)); + add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3)); + + Assert.assertEquals( + ImmutableSet.of( + makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3).getObject(), + makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3).getObject(), + makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3).getObject(), + makeNumbered("0", 0, 0).getObject(), + makeNumbered("0", 1, 0).getObject(), + makeNumbered("2", 0, 3, 0).getObject(), + makeNumbered("2", 1, 3, 0).getObject() + ), + timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2019-01-01/2019-01-04"), Partitions.INCOMPLETE_OK) + ); + } } diff --git a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java index d3a62403909..3d8c5a34925 100644 --- a/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java +++ b/core/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTestBase.java @@ -19,6 +19,7 @@ package org.apache.druid.timeline; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -107,7 +108,10 @@ public class VersionedIntervalTimelineTestBase void checkRemove() { for (TimelineObjectHolder holder : timeline.findFullyOvershadowed()) { - for (PartitionChunk chunk : holder.getObject()) { + // Copy chunks to avoid the ConcurrentModificationException. + // Note that timeline.remove() modifies the PartitionHolder. + List> chunks = FluentIterable.from(holder.getObject()).toList(); + for (PartitionChunk chunk : chunks) { timeline.remove(holder.getInterval(), holder.getVersion(), chunk); } } @@ -154,26 +158,36 @@ public class VersionedIntervalTimelineTestBase ); } - PartitionChunk makeSingle(String majorVersion, int value) + public static PartitionChunk makeSingle(String majorVersion, int value) { return makeSingle(majorVersion, 0, value); } - private PartitionChunk makeSingle(String majorVersion, int partitionNum, int val) + public static PartitionChunk makeSingle(String majorVersion, int partitionNum, int val) { return new SingleElementPartitionChunk<>(new OvershadowableInteger(majorVersion, partitionNum, val)); } - PartitionChunk makeNumbered(String majorVersion, int partitionNum, int val) + public static PartitionChunk makeNumbered(String majorVersion, int partitionNum, int val) + { + return makeNumbered(majorVersion, partitionNum, 0, val); + } + + public static PartitionChunk makeNumbered( + String majorVersion, + int partitionNum, + int chunks, + int val + ) { return new NumberedPartitionChunk<>( partitionNum, - 0, + chunks, new OvershadowableInteger(majorVersion, partitionNum, val) ); } - PartitionChunk makeNumberedOverwriting( + public static PartitionChunk makeNumberedOverwriting( String majorVersion, int partitionNumOrdinal, int val, diff --git a/core/src/test/java/org/apache/druid/timeline/partition/AtomicUpdateGroupTest.java b/core/src/test/java/org/apache/druid/timeline/partition/AtomicUpdateGroupTest.java new file mode 100644 index 00000000000..1a7cd779a93 --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/partition/AtomicUpdateGroupTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline.partition; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.timeline.VersionedIntervalTimelineTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.util.stream.IntStream; + +public class AtomicUpdateGroupTest +{ + @Test + public void testCopy() + { + AtomicUpdateGroup original = new AtomicUpdateGroup<>( + VersionedIntervalTimelineTestBase.makeNumberedOverwriting( + "0", + 0, + 0, + 0, + 10, + 0, + 10 + ) + ); + IntStream.range(1, 10).forEach( + i -> original.add( + VersionedIntervalTimelineTestBase.makeNumberedOverwriting( + "0", + i, + 0, + 0, + 10, + 0, + 10 + ) + ) + ); + + Assert.assertEquals(AtomicUpdateGroup.copy(original), original); + } + + @Test + public void testEqualAndHashCodeContract() + { + EqualsVerifier.forClass(AtomicUpdateGroup.class).usingGetClass().verify(); + } +} diff --git a/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java index a48cef7d0da..92f0fed47b4 100644 --- a/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java +++ b/core/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java @@ -20,7 +20,9 @@ package org.apache.druid.timeline.partition; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.timeline.partition.OvershadowableManager.RootPartitionRange; import org.apache.druid.timeline.partition.OvershadowableManager.State; import org.junit.Assert; @@ -63,6 +65,63 @@ public class OvershadowableManagerTest expectedStandbyChunks = new ArrayList<>(); } + @Test + public void testCopyVisible() + { + // chunks of partition id 0 and 1 + manager.addChunk(newRootChunk()); + manager.addChunk(newRootChunk()); + + // chunks to overshadow the partition id range [0, 2) + manager.addChunk(newNonRootChunk(0, 2, 1, 3)); + manager.addChunk(newNonRootChunk(0, 2, 1, 3)); + manager.addChunk(newNonRootChunk(0, 2, 1, 3)); + + // chunks of partition id 3 and 4 + manager.addChunk(newRootChunk()); + manager.addChunk(newRootChunk()); + + // standby chunk + manager.addChunk(newNonRootChunk(2, 4, 1, 3)); + + OvershadowableManager copy = OvershadowableManager.copyVisible(manager); + Assert.assertTrue(copy.getOvershadowedChunks().isEmpty()); + Assert.assertTrue(copy.getStandbyChunks().isEmpty()); + Assert.assertEquals( + Lists.newArrayList(manager.visibleChunksIterator()), + Lists.newArrayList(copy.visibleChunksIterator()) + ); + } + + @Test + public void testDeepCopy() + { + // chunks of partition id 0 and 1 + manager.addChunk(newRootChunk()); + manager.addChunk(newRootChunk()); + + // chunks to overshadow the partition id range [0, 2) + manager.addChunk(newNonRootChunk(0, 2, 1, 3)); + manager.addChunk(newNonRootChunk(0, 2, 1, 3)); + manager.addChunk(newNonRootChunk(0, 2, 1, 3)); + + // chunks of partition id 3 and 4 + manager.addChunk(newRootChunk()); + manager.addChunk(newRootChunk()); + + // standby chunk + manager.addChunk(newNonRootChunk(2, 4, 1, 3)); + + OvershadowableManager copy = OvershadowableManager.deepCopy(manager); + Assert.assertEquals(manager, copy); + } + + @Test + public void testEqualAndHashCodeContract() + { + EqualsVerifier.forClass(OvershadowableManager.class).usingGetClass().verify(); + } + @Test public void testFindOvershadowedBy() { @@ -981,7 +1040,7 @@ public class OvershadowableManagerTest Assert.assertEquals( "Mismatched visible chunks", new HashSet<>(expectedVisibleChunks), - Sets.newHashSet(manager.createVisibleChunksStream().iterator()) + Sets.newHashSet(manager.visibleChunksIterator()) ); Assert.assertEquals( "Mismatched overshadowed chunks", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index a380ac8ae5e..9f1dc52c1ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; +import com.google.common.collect.FluentIterable; import org.apache.commons.io.FileUtils; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.data.input.InputRow; @@ -65,6 +66,7 @@ import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.SegmentAllocator; import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.joda.time.Interval; @@ -223,11 +225,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask final Set allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments()); allSegments.addAll(pushedSegments); final VersionedIntervalTimeline timeline = VersionedIntervalTimeline.forSegments(allSegments); - final Set oldSegments = timeline.findFullyOvershadowed() - .stream() - .flatMap(holder -> holder.getObject().stream()) - .map(PartitionChunk::getObject) - .collect(Collectors.toSet()); + final Set oldSegments = FluentIterable.from(timeline.findFullyOvershadowed()) + .transformAndConcat(TimelineObjectHolder::getObject) + .transform(PartitionChunk::getObject) + .toSet(); taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), oldSegments, pushedSegments)); return TaskStatus.success(getId()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 191f47e7357..8fc214263b1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -25,10 +25,10 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterators; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.AbstractInputSource; -import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -171,15 +171,16 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); final List> timeline = createTimeline(); - final Stream entityStream = timeline - .stream() - .flatMap(holder -> { + final Iterator entityIterator = FluentIterable + .from(timeline) + .transformAndConcat(holder -> { + //noinspection ConstantConditions final PartitionHolder partitionHolder = holder.getObject(); - return partitionHolder - .stream() - .map(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); - }); - + //noinspection ConstantConditions + return FluentIterable + .from(partitionHolder) + .transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); + }).iterator(); final List effectiveDimensions; if (dimensions == null) { effectiveDimensions = ReingestionTimelineUtils.getUniqueDimensions( @@ -209,7 +210,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI return new InputEntityIteratingReader( inputRowSchema, inputFormat, - entityStream.iterator(), + entityIterator, temporaryDirectory ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 8a2f81dd60b..7b1baa9bc1f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -168,8 +168,9 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv runTestTask(inputInterval, Granularities.DAY); final Interval interval = inputInterval == null ? Intervals.ETERNITY : inputInterval; - final Collection allSegments = - getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE); + final Collection allSegments = new HashSet<>( + getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE) + ); // Reingest the same data. Each segment should get replaced by a segment with a newer version. runTestTask(inputInterval, secondSegmentGranularity); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index e8cf277a075..1e8a0e9d59b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -19,6 +19,7 @@ package org.apache.druid.tests.indexer; +import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; @@ -250,11 +251,13 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest ); final List> holders = timeline.lookup(Intervals.ETERNITY); - return holders - .stream() - .flatMap(holder -> holder.getObject().stream()) - .anyMatch(chunk -> oldVersions.stream() - .anyMatch(oldSegment -> chunk.getObject().overshadows(oldSegment))); + return FluentIterable + .from(holders) + .transformAndConcat(TimelineObjectHolder::getObject) + .anyMatch( + chunk -> FluentIterable.from(oldVersions) + .anyMatch(oldSegment -> chunk.getObject().overshadows(oldSegment)) + ); }, "See a new version" ); diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java index 1cca2b9aed0..6041f2ba16d 100644 --- a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java +++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java @@ -169,5 +169,4 @@ public class DataSourcesSnapshot } return overshadowedSegments; } - } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 882fd8c7e3a..5c48598880b 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -22,6 +22,7 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -71,13 +72,11 @@ import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.StreamSupport; /** */ @@ -811,9 +810,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor return null; } else { - if (existingChunks - .stream() - .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)) + //noinspection ConstantConditions + if (FluentIterable + .from(existingChunks) + .transformAndConcat(TimelineObjectHolder::getObject) .anyMatch(chunk -> !chunk.getObject().getShardSpec().isCompatible(partialShardSpec.getShardSpecClass()))) { // All existing segments should have a compatible shardSpec with partialShardSpec. return null; @@ -825,15 +825,19 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor if (!existingChunks.isEmpty()) { TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - maxId = StreamSupport - .stream(existingHolder.getObject().spliterator(), false) + //noinspection ConstantConditions + for (DataSegment segment : FluentIterable + .from(existingHolder.getObject()) + .transform(PartitionChunk::getObject) // Here we check only the segments of the same shardSpec to find out the max partitionId. // Note that OverwriteShardSpec has the higher range for partitionId than others. // See PartitionIds. - .filter(chunk -> chunk.getObject().getShardSpec().getClass() == partialShardSpec.getShardSpecClass()) - .max(Comparator.comparing(chunk -> chunk.getObject().getShardSpec().getPartitionNum())) - .map(chunk -> SegmentIdWithShardSpec.fromDataSegment(chunk.getObject())) - .orElse(null); + .filter(segment -> segment.getShardSpec().getClass() == partialShardSpec.getShardSpecClass())) { + // Don't use the stream API for performance. + if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + maxId = SegmentIdWithShardSpec.fromDataSegment(segment); + } + } } final List pendings = getPendingSegmentsForIntervalWithHandle( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index b4b4f235b05..78ab09ce216 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -22,10 +22,12 @@ package org.apache.druid.server.coordinator.duty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.guava.Comparators; @@ -54,7 +56,6 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.PriorityQueue; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; /** * This class iterates all segments of the dataSources configured for compaction from the newest to the oldest. @@ -119,13 +120,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator final List> holders = timeline.lookup(interval); - resultMap.put( - entry.getDataSource(), - holders.stream() - .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false)) - .mapToLong(chunk -> chunk.getObject().getSize()) - .sum() - ); + long size = 0; + for (DataSegment segment : FluentIterable + .from(holders) + .transformAndConcat(TimelineObjectHolder::getObject) + .transform(PartitionChunk::getObject)) { + size += segment.getSize(); + } + + resultMap.put(entry.getDataSource(), size); } return resultMap; } @@ -237,11 +240,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator if (holders.isEmpty()) { throw new NoSuchElementException(); } - return holders.remove(holders.size() - 1) - .getObject() - .stream() - .map(PartitionChunk::getObject) - .collect(Collectors.toList()); + return FluentIterable.from(holders.remove(holders.size() - 1).getObject()) + .transform(PartitionChunk::getObject) + .toList(); } } @@ -509,11 +510,16 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator private QueueEntry(List segments) { Preconditions.checkArgument(segments != null && !segments.isEmpty()); - Collections.sort(segments); - this.interval = new Interval( - segments.get(0).getInterval().getStart(), - segments.get(segments.size() - 1).getInterval().getEnd() - ); + DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN; + for (DataSegment segment : segments) { + if (segment.getInterval().getStart().compareTo(minStart) < 0) { + minStart = segment.getInterval().getStart(); + } + if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) { + maxEnd = segment.getInterval().getEnd(); + } + } + this.interval = new Interval(minStart, maxEnd); this.segments = segments; } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 2024274417b..3baf47f8093 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -30,6 +30,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; @@ -44,6 +45,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.ShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; @@ -74,11 +76,16 @@ public class CompactSegmentsTest ) { Preconditions.checkArgument(segments.size() > 1); - Collections.sort(segments); - Interval compactInterval = new Interval( - segments.get(0).getInterval().getStart(), - segments.get(segments.size() - 1).getInterval().getEnd() - ); + DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN; + for (DataSegment segment : segments) { + if (segment.getInterval().getStart().compareTo(minStart) < 0) { + minStart = segment.getInterval().getStart(); + } + if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) { + maxEnd = segment.getInterval().getEnd(); + } + } + Interval compactInterval = new Interval(minStart, maxEnd); final VersionedIntervalTimeline timeline = dataSources.get(segments.get(0).getDataSource()); segments.forEach( segment -> timeline.remove(