Improve OvershadowableManager performance (#9441)

* Use the iterator instead of higherKey(); use the iterator API instead of stream

* Fix tests; fix a concurrency bug in timeline

* fix test

* add tests for findNonOvershadowedObjectsInInterval

* fix test

* add missing tests; fix a bug in QueueEntry

* equals tests

* fix test
This commit is contained in:
Jihoon Son 2020-03-10 13:22:19 -07:00 committed by GitHub
parent 7e0e767cc2
commit 7401bb3f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 561 additions and 258 deletions

View File

@ -67,7 +67,7 @@ public class VersionedIntervalTimelineBenchmark
@Param({"10", "100", "1000"})
private int numInitialRootGenSegmentsPerInterval;
@Param({"1", "5"})
@Param({"1", "2"})
private int numNonRootGenerations;
@Param({"false", "true"})

View File

@ -22,12 +22,12 @@ package org.apache.druid.timeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.utils.CollectionUtils;
@ -158,14 +158,17 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
/**
* Computes a set with all objects falling within the specified interval which are at least partially "visible" in
* this interval (that is, are not fully overshadowed within this interval).
*
* Note that this method returns a set of {@link ObjectType}. Duplicate objects in different time chunks will be
* removed in the result.
*/
public Set<ObjectType> findNonOvershadowedObjectsInInterval(Interval interval, Partitions completeness)
{
return lookup(interval, completeness)
.stream()
.flatMap(timelineObjectHolder -> timelineObjectHolder.getObject().stream())
.map(PartitionChunk::getObject)
.collect(Collectors.toSet());
return FluentIterable
.from(lookup(interval, completeness))
.transformAndConcat(TimelineObjectHolder::getObject)
.transform(PartitionChunk::getObject)
.toSet();
}
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
@ -278,7 +281,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) {
return new ImmutablePartitionHolder<>(foundEntry.getPartitionHolder());
return foundEntry.getPartitionHolder().asImmutable();
}
}
}
@ -362,7 +365,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
entry.getTrueInterval(),
entry.getTrueInterval(),
entry.getVersion(),
new PartitionHolder<>(entry.getPartitionHolder())
PartitionHolder.copyWithOnlyVisibleChunks(entry.getPartitionHolder())
);
}
@ -381,9 +384,13 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
final Set<TimelineObjectHolder<VersionType, ObjectType>> overshadowedObjects = overshadowedPartitionsTimeline
.values()
.stream()
.flatMap(
(Map<VersionType, TimelineEntry> entry) -> entry.values().stream().map(this::timelineEntryToObjectHolder)
)
.flatMap((Map<VersionType, TimelineEntry> entry) -> entry.values().stream())
.map(entry -> new TimelineObjectHolder<>(
entry.getTrueInterval(),
entry.getTrueInterval(),
entry.getVersion(),
PartitionHolder.deepCopy(entry.getPartitionHolder())
))
.collect(Collectors.toSet());
// 2. Visible timelineEntries can also have overshadowed objects. Add them to the result too.
@ -482,7 +489,12 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
if (versionCompare > 0) {
return false;
} else if (versionCompare == 0) {
if (timelineEntry.partitionHolder.stream().noneMatch(chunk -> chunk.getObject().overshadows(object))) {
// Intentionally use the Iterators API instead of the stream API for performance.
//noinspection ConstantConditions
final boolean nonOvershadowedObject = Iterators.all(
timelineEntry.partitionHolder.iterator(), chunk -> !chunk.getObject().overshadows(object)
);
if (nonOvershadowedObject) {
return false;
}
}
@ -724,7 +736,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshado
timelineInterval,
val.getTrueInterval(),
val.getVersion(),
new PartitionHolder<>(val.getPartitionHolder())
PartitionHolder.copyWithOnlyVisibleChunks(val.getPartitionHolder())
)
);
}

View File

@ -45,11 +45,21 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
// This may matter if there are a lot of segments to keep in memory as in brokers or the coordinator.
private final List<PartitionChunk<T>> chunks = new ArrayList<>();
static <T extends Overshadowable<T>> AtomicUpdateGroup<T> copy(AtomicUpdateGroup<T> group)
{
return new AtomicUpdateGroup<>(group.chunks);
}
AtomicUpdateGroup(PartitionChunk<T> chunk)
{
this.chunks.add(chunk);
}
private AtomicUpdateGroup(List<PartitionChunk<T>> chunks)
{
this.chunks.addAll(chunks);
}
public void add(PartitionChunk<T> chunk)
{
if (isFull()) {

View File

@ -25,9 +25,9 @@ import org.apache.druid.timeline.Overshadowable;
*/
public class ImmutablePartitionHolder<T extends Overshadowable<T>> extends PartitionHolder<T>
{
public ImmutablePartitionHolder(PartitionHolder<T> partitionHolder)
protected ImmutablePartitionHolder(OvershadowableManager<T> overshadowableManager)
{
super(partitionHolder);
super(overshadowableManager);
}
@Override

View File

@ -21,7 +21,8 @@ package org.apache.druid.timeline.partition;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
import it.unimi.dsi.fastutil.objects.ObjectCollection;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
@ -44,7 +45,9 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -52,9 +55,6 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details.
@ -105,22 +105,46 @@ class OvershadowableManager<T extends Overshadowable<T>>
this.overshadowedGroups = new TreeMap<>();
}
OvershadowableManager(OvershadowableManager<T> other)
public static <T extends Overshadowable<T>> OvershadowableManager<T> copyVisible(OvershadowableManager<T> original)
{
this.knownPartitionChunks = new HashMap<>(other.knownPartitionChunks);
this.standbyGroups = new TreeMap<>(other.standbyGroups);
this.visibleGroupPerRange = new TreeMap<>(other.visibleGroupPerRange);
this.overshadowedGroups = new TreeMap<>(other.overshadowedGroups);
final OvershadowableManager<T> copy = new OvershadowableManager<>();
original.visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> {
// There should be only one group per partition range
final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next();
group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
copy.visibleGroupPerRange.put(
partitionRange,
new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group))
);
});
return copy;
}
private OvershadowableManager(List<AtomicUpdateGroup<T>> groups)
public static <T extends Overshadowable<T>> OvershadowableManager<T> deepCopy(OvershadowableManager<T> original)
{
this();
for (AtomicUpdateGroup<T> entry : groups) {
for (PartitionChunk<T> chunk : entry.getChunks()) {
addChunk(chunk);
}
}
final OvershadowableManager<T> copy = copyVisible(original);
original.overshadowedGroups.forEach((partitionRange, versionToGroups) -> {
// There should be only one group per partition range
final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next();
group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
copy.overshadowedGroups.put(
partitionRange,
new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group))
);
});
original.standbyGroups.forEach((partitionRange, versionToGroups) -> {
// There should be only one group per partition range
final AtomicUpdateGroup<T> group = versionToGroups.values().iterator().next();
group.getChunks().forEach(chunk -> copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
copy.standbyGroups.put(
partitionRange,
new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), AtomicUpdateGroup.copy(group))
);
});
return copy;
}
private TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> getStateMap(State state)
@ -168,7 +192,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
private void replaceVisibleWith(
Collection<AtomicUpdateGroup<T>> oldVisibleGroups,
State newStateOfOldVisibleGroup,
List<AtomicUpdateGroup<T>> newVisibleGroups,
Collection<AtomicUpdateGroup<T>> newVisibleGroups,
State oldStateOfNewVisibleGroups
)
{
@ -263,27 +287,17 @@ class OvershadowableManager<T extends Overshadowable<T>>
* @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions
* than this.
* @param fromState the state to search for overshadowed groups.
*
* @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
*/
@VisibleForTesting
List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
{
final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
rangeOfAug,
stateMap,
true
);
if (current == null) {
return Collections.emptyList();
}
// Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
// Note that RootPartitionRange of entries are always consecutive.
final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
while (current != null && rangeOfAug.overlaps(current.getKey())) {
final Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> iterator =
entryIteratorGreaterThan(rangeOfAug.startPartitionId, stateMap);
while (iterator.hasNext()) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = iterator.next();
if (rangeOfAug.contains(current.getKey())) {
// versionToGroup is sorted by minorVersion.
// versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions
@ -296,8 +310,9 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (versionToGroup.firstShortKey() < minorVersion) {
found.addAll(versionToGroup.headMap(minorVersion).values());
}
} else {
break;
}
current = stateMap.higherEntry(current.getKey());
}
return found;
}
@ -318,27 +333,23 @@ class OvershadowableManager<T extends Overshadowable<T>>
* @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor
* versions than this.
* @param fromState the state to search for overshadowed groups.
*
* @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
*/
@VisibleForTesting
List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
{
final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
rangeOfAug,
stateMap,
false
);
if (current == null) {
return Collections.emptyList();
}
final Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> iterator =
entryIteratorSmallerThan(rangeOfAug.endPartitionId, stateMap);
// Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug.
// Note that RootPartitionRange of entries are always consecutive.
final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
while (current != null && current.getKey().overlaps(rangeOfAug)) {
while (iterator.hasNext()) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = iterator.next();
if (!current.getKey().overlaps(rangeOfAug)) {
break;
}
if (current.getKey().contains(rangeOfAug)) {
// versionToGroup is sorted by minorVersion.
// versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher
@ -352,63 +363,62 @@ class OvershadowableManager<T extends Overshadowable<T>>
found.addAll(versionToGroup.tailMap(minorVersion).values());
}
}
current = stateMap.higherEntry(current.getKey());
}
return found;
}
/**
* Finds the lowest entry overlapping with the given root partition range.
* It first searches the entries lower than or equal to the given range.
* If there's no such entry lower than the given range, then it searches the entries higher than the given range.
*
* @return an entry of the lowest key overlapping with the given range. Otherwise null.
*/
@Nullable
private Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
RootPartitionRange rangeOfAug,
TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap,
boolean strictSameStartId
)
boolean isOvershadowedByVisibleGroup(RootPartitionRange partitionRange, short minorVersion)
{
// Searches the entries lower than or equal to the given range.
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = stateMap.floorEntry(rangeOfAug);
final Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> iterator =
entryIteratorSmallerThan(partitionRange.endPartitionId, visibleGroupPerRange);
if (current == null) {
// Searches the entries higher than then given range.
current = stateMap.higherEntry(rangeOfAug);
}
if (current == null) {
return null;
}
// floorEntry() can return the greatest key less than rangeOfAug. We need to skip non-overlapping keys.
while (current != null && !current.getKey().overlaps(rangeOfAug)) {
current = stateMap.higherEntry(current.getKey());
}
final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
if (strictSameStartId) {
predicate = (entryRange, groupRange) -> entryRange.startPartitionId == groupRange.startPartitionId;
} else {
predicate = RootPartitionRange::overlaps;
}
// There could be multiple entries of the same startPartitionId but different endPartitionId.
// Find the first key of the same startPartitionId which has the lowest endPartitionId.
while (current != null) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
current.getKey()
);
if (lowerEntry != null && predicate.test(lowerEntry.getKey(), rangeOfAug)) {
current = lowerEntry;
} else {
// Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug.
// Note that RootPartitionRange of entries are always consecutive.
while (iterator.hasNext()) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = iterator.next();
if (!current.getKey().overlaps(partitionRange)) {
break;
}
if (current.getKey().contains(partitionRange)) {
// versionToGroup is sorted by minorVersion.
// versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher
// minorVersions than the given minorVersion.
final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
if (versionToGroup.lastShortKey() > minorVersion) {
return true;
}
}
}
return false;
}
return current;
/**
* Returns an iterator of entries that has a {@link RootPartitionRange} smaller than the given partitionId.
* A RootPartitionRange is smaller than a partitionId if {@link RootPartitionRange#startPartitionId} < partitionId.
*/
private Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> entryIteratorSmallerThan(
short partitionId,
TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap
)
{
final RootPartitionRange lowFench = new RootPartitionRange((short) 0, (short) 0);
final RootPartitionRange highFence = new RootPartitionRange(partitionId, partitionId);
return stateMap.subMap(lowFench, false, highFence, false).descendingMap().entrySet().iterator();
}
/**
* Returns an iterator of entries that has a {@link RootPartitionRange} greater than the given partitionId.
* A RootPartitionRange is greater than a partitionId if {@link RootPartitionRange#startPartitionId} >= partitionId
* and {@link RootPartitionRange#endPartitionId} > partitionId.
*/
private Iterator<Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>>> entryIteratorGreaterThan(
short partitionId,
TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap
)
{
final RootPartitionRange lowFench = new RootPartitionRange(partitionId, partitionId);
final RootPartitionRange highFence = new RootPartitionRange(Short.MAX_VALUE, Short.MAX_VALUE);
return stateMap.subMap(lowFench, false, highFence, false).entrySet().iterator();
}
/**
@ -539,13 +549,14 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (!isOvershadowingGroupsFull) {
// Let's check the overshadowed groups can cover the partition range of groupsOvershadowingAug
// and are fully available.
final List<AtomicUpdateGroup<T>> latestFullGroups = groupsOvershadowingAug
.stream()
.flatMap(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
//noinspection ConstantConditions
final List<AtomicUpdateGroup<T>> latestFullGroups = FluentIterable
.from(groupsOvershadowingAug)
.transformAndConcat(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
RootPartitionRange.of(eachFullgroup),
eachFullgroup.getMinorVersion()).stream()
)
.collect(Collectors.toList());
eachFullgroup.getMinorVersion()
))
.toList();
if (!latestFullGroups.isEmpty()) {
final boolean isOvershadowedGroupsFull = doGroupsFullyCoverPartitionRange(
@ -574,7 +585,6 @@ class OvershadowableManager<T extends Overshadowable<T>>
* @param groups atomicUpdateGroups sorted by their rootPartitionRange
* @param startRootPartitionId the start partitionId of the root partition range to check the coverage
* @param endRootPartitionId the end partitionId of the root partition range to check the coverage
*
* @return true if the given groups fully cover the given partition range.
*/
private boolean doGroupsFullyCoverPartitionRange(
@ -675,11 +685,10 @@ class OvershadowableManager<T extends Overshadowable<T>>
final AtomicUpdateGroup<T> newAtomicUpdateGroup = new AtomicUpdateGroup<>(chunk);
// Decide the initial state of the new atomicUpdateGroup
final boolean overshadowed = visibleGroupPerRange
.values()
.stream()
.flatMap(map -> map.values().stream())
.anyMatch(group -> group.overshadows(newAtomicUpdateGroup));
final boolean overshadowed = isOvershadowedByVisibleGroup(
RootPartitionRange.of(newAtomicUpdateGroup),
newAtomicUpdateGroup.getMinorVersion()
);
if (overshadowed) {
addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED, true);
@ -724,20 +733,18 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (!latestFullAugs.isEmpty()) {
// The current visible atomicUpdateGroup becomes standby
// and the fully available overshadowed atomicUpdateGroups become visible
final Set<AtomicUpdateGroup<T>> overshadowsLatestFullAugsInVisible = latestFullAugs
.stream()
.flatMap(group -> findOvershadows(group, State.VISIBLE).stream())
.collect(Collectors.toSet());
final Set<AtomicUpdateGroup<T>> overshadowsLatestFullAugsInVisible = FluentIterable
.from(latestFullAugs)
.transformAndConcat(group -> findOvershadows(group, State.VISIBLE))
.toSet();
replaceVisibleWith(
overshadowsLatestFullAugsInVisible,
State.STANDBY,
latestFullAugs,
State.OVERSHADOWED
);
latestFullAugs
.stream()
.flatMap(group -> findOvershadows(group, State.OVERSHADOWED).stream())
.collect(Collectors.toSet())
FluentIterable.from(latestFullAugs)
.transformAndConcat(group -> findOvershadows(group, State.OVERSHADOWED))
.forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY));
} else {
// Find the latest non-fully available atomicUpdateGroups
@ -745,17 +752,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
findOvershadows(rangeOfAug, minorVersion, State.STANDBY)
);
if (!latestStandby.isEmpty()) {
final List<AtomicUpdateGroup<T>> overshadowedByLatestStandby = latestStandby
.stream()
.flatMap(group -> findOvershadowedBy(group, State.VISIBLE).stream())
.collect(Collectors.toList());
final List<AtomicUpdateGroup<T>> overshadowedByLatestStandby = FluentIterable
.from(latestStandby)
.transformAndConcat(group -> findOvershadowedBy(group, State.VISIBLE))
.toList();
replaceVisibleWith(overshadowedByLatestStandby, State.OVERSHADOWED, latestStandby, State.STANDBY);
// All standby groups overshadowed by the new visible group should be moved to overshadowed
latestStandby
.stream()
.flatMap(group -> findOvershadowedBy(group, State.STANDBY).stream())
.collect(Collectors.toSet())
FluentIterable
.from(latestStandby)
.transformAndConcat(group -> findOvershadowedBy(group, State.STANDBY))
.forEach(aug -> transitAtomicUpdateGroupState(aug, State.STANDBY, State.OVERSHADOWED));
} else if (augOfRemovedChunk.isEmpty()) {
// Visible is empty. Move the latest overshadowed to visible.
@ -785,15 +791,27 @@ class OvershadowableManager<T extends Overshadowable<T>>
return Collections.emptyList();
}
final OvershadowableManager<T> manager = new OvershadowableManager<>(groups);
if (!manager.standbyGroups.isEmpty()) {
throw new ISE("This method should be called only when there is no fully available group in the given state.");
final TreeMap<RootPartitionRange, AtomicUpdateGroup<T>> rangeToGroup = new TreeMap<>();
for (AtomicUpdateGroup<T> group : groups) {
rangeToGroup.put(RootPartitionRange.of(group), group);
}
final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroupPerRange.values()) {
visibles.addAll(map.values());
// rangeToGroup is sorted by RootPartitionRange which means, the groups of the wider range will appear later
// in the rangeToGroup map. Since the wider groups have higer minor versions than narrower groups,
// we iterate the rangeToGroup from the last entry in descending order.
Entry<RootPartitionRange, AtomicUpdateGroup<T>> currEntry = rangeToGroup.lastEntry();
while (currEntry != null) {
final Entry<RootPartitionRange, AtomicUpdateGroup<T>> lowerEntry = rangeToGroup.lowerEntry(currEntry.getKey());
if (lowerEntry != null) {
if (lowerEntry.getKey().endPartitionId != currEntry.getKey().startPartitionId) {
return Collections.emptyList();
}
}
visibles.add(currEntry.getValue());
currEntry = lowerEntry;
}
// visibles should be sorted.
visibles.sort(Comparator.comparing(RootPartitionRange::of));
return visibles;
}
@ -807,27 +825,38 @@ class OvershadowableManager<T extends Overshadowable<T>>
minorVersion,
State.OVERSHADOWED
);
if (overshadowedGroups.isEmpty()) {
// Filter out non-fully available groups.
final TreeMap<RootPartitionRange, AtomicUpdateGroup<T>> fullGroups = new TreeMap<>();
for (AtomicUpdateGroup<T> group : FluentIterable.from(overshadowedGroups).filter(AtomicUpdateGroup::isFull)) {
fullGroups.put(RootPartitionRange.of(group), group);
}
if (fullGroups.isEmpty()) {
return Collections.emptyList();
}
if (fullGroups.firstKey().startPartitionId != rangeOfAug.startPartitionId
|| fullGroups.lastKey().endPartitionId != rangeOfAug.endPartitionId) {
return Collections.emptyList();
}
final OvershadowableManager<T> manager = new OvershadowableManager<>(overshadowedGroups);
// Find latest fully available groups.
final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroupPerRange.values()) {
for (AtomicUpdateGroup<T> atomicUpdateGroup : map.values()) {
if (!atomicUpdateGroup.isFull()) {
// fullGroups is sorted by RootPartitionRange which means, the groups of the wider range will appear later
// in the fullGroups map. Since the wider groups have higer minor versions than narrower groups,
// we iterate the fullGroups from the last entry in descending order.
Entry<RootPartitionRange, AtomicUpdateGroup<T>> currEntry = fullGroups.lastEntry();
while (currEntry != null) {
final Entry<RootPartitionRange, AtomicUpdateGroup<T>> lowerEntry = fullGroups.lowerEntry(currEntry.getKey());
if (lowerEntry != null) {
if (lowerEntry.getKey().endPartitionId != currEntry.getKey().startPartitionId) {
return Collections.emptyList();
}
visibles.add(atomicUpdateGroup);
}
visibles.add(currEntry.getValue());
currEntry = lowerEntry;
}
final RootPartitionRange foundRange = RootPartitionRange.of(
visibles.get(0).getStartRootPartitionId(),
visibles.get(visibles.size() - 1).getEndRootPartitionId()
);
if (!rangeOfAug.equals(foundRange)) {
return Collections.emptyList();
}
// visibles should be sorted.
visibles.sort(Comparator.comparing(RootPartitionRange::of));
return visibles;
}
@ -896,10 +925,14 @@ class OvershadowableManager<T extends Overshadowable<T>>
public boolean isComplete()
{
return visibleGroupPerRange
.values()
.stream()
.allMatch(map -> Iterables.getOnlyElement(map.values()).isFull());
return Iterators.all(
visibleGroupPerRange.values().iterator(),
map -> {
SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>> singleMap =
(SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>>) map;
//noinspection ConstantConditions
return singleMap.val.isFull();
});
}
@Nullable
@ -922,13 +955,18 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
}
Stream<PartitionChunk<T>> createVisibleChunksStream()
Iterator<PartitionChunk<T>> visibleChunksIterator()
{
return visibleGroupPerRange
.values()
.stream()
.flatMap((Short2ObjectSortedMap<AtomicUpdateGroup<T>> map) -> map.values().stream())
.flatMap((AtomicUpdateGroup<T> aug) -> aug.getChunks().stream());
final FluentIterable<Short2ObjectSortedMap<AtomicUpdateGroup<T>>> versionToGroupIterable = FluentIterable.from(
visibleGroupPerRange.values()
);
return versionToGroupIterable
.transformAndConcat(map -> {
SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>> singleMap =
(SingleEntryShort2ObjectSortedMap<AtomicUpdateGroup<T>>) map;
//noinspection ConstantConditions
return singleMap.val.getChunks();
}).iterator();
}
List<PartitionChunk<T>> getOvershadowedChunks()
@ -1086,6 +1124,12 @@ class OvershadowableManager<T extends Overshadowable<T>>
val = null;
}
private SingleEntryShort2ObjectSortedMap(short key, V val)
{
this.key = key;
this.val = val;
}
@Override
public Short2ObjectSortedMap<V> subMap(short fromKey, short toKey)
{

View File

@ -26,8 +26,6 @@ import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Spliterator;
import java.util.stream.Stream;
/**
* An object that clumps together multiple other objects which each represent a shard of some space.
@ -36,6 +34,18 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
{
private final OvershadowableManager<T> overshadowableManager;
public static <T extends Overshadowable<T>> PartitionHolder<T> copyWithOnlyVisibleChunks(
PartitionHolder<T> partitionHolder
)
{
return new PartitionHolder<>(OvershadowableManager.copyVisible(partitionHolder.overshadowableManager));
}
public static <T extends Overshadowable<T>> PartitionHolder<T> deepCopy(PartitionHolder<T> partitionHolder)
{
return new PartitionHolder<>(OvershadowableManager.deepCopy(partitionHolder.overshadowableManager));
}
public PartitionHolder(PartitionChunk<T> initialChunk)
{
this.overshadowableManager = new OvershadowableManager<>();
@ -50,9 +60,14 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
}
}
public PartitionHolder(PartitionHolder<T> partitionHolder)
protected PartitionHolder(OvershadowableManager<T> overshadowableManager)
{
this.overshadowableManager = new OvershadowableManager<>(partitionHolder.overshadowableManager);
this.overshadowableManager = overshadowableManager;
}
public ImmutablePartitionHolder<T> asImmutable()
{
return new ImmutablePartitionHolder<>(OvershadowableManager.copyVisible(overshadowableManager));
}
public boolean add(PartitionChunk<T> chunk)
@ -112,18 +127,7 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
@Override
public Iterator<PartitionChunk<T>> iterator()
{
return stream().iterator();
}
@Override
public Spliterator<PartitionChunk<T>> spliterator()
{
return stream().spliterator();
}
public Stream<PartitionChunk<T>> stream()
{
return overshadowableManager.createVisibleChunksStream();
return overshadowableManager.visibleChunksIterator();
}
public List<PartitionChunk<T>> getOvershadowed()

View File

@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionHolder;
@ -225,22 +224,22 @@ public class VersionedIntervalTimelineSpecificDataTest extends VersionedInterval
public void testFindEntry()
{
Assert.assertEquals(
new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))),
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1")
);
Assert.assertEquals(
new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))),
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1")
);
Assert.assertEquals(
new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))),
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1")
);
Assert.assertEquals(
new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))),
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1")
);

View File

@ -25,7 +25,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionHolder;
@ -58,7 +57,7 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest
add("2011-01-02/2011-01-05", "2", 1);
Assert.assertEquals(
new ImmutablePartitionHolder<>(new PartitionHolder<>(makeSingle("1", 1))),
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1")
);
}
@ -1407,9 +1406,7 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest
new PartitionHolder<>(
ImmutableList.of(
makeNumbered("1", 0, 0),
makeNumbered("1", 1, 0),
makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3),
makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3)
makeNumbered("1", 1, 0)
)
)
)
@ -1500,4 +1497,79 @@ public class VersionedIntervalTimelineTest extends VersionedIntervalTimelineTest
Assert.assertEquals(2, Lists.newArrayList(overshadowableIntegers.iterator()).size());
}
@Test
public void testFindNonOvershadowedObjectsInIntervalWithOnlyCompletePartitionsReturningValidResult()
{
// 2019-01-01/2019-01-02
add("2019-01-01/2019-01-02", "0", makeNumbered("0", 0, 0));
add("2019-01-01/2019-01-02", "0", makeNumbered("0", 1, 0));
add("2019-01-01/2019-01-02", "0", makeNumbered("0", 2, 0));
// 2019-01-02/2019-01-03
add("2019-01-02/2019-01-03", "0", makeNumbered("0", 0, 0));
add("2019-01-02/2019-01-03", "0", makeNumbered("0", 1, 0));
// Incomplete partitions
add("2019-01-03/2019-01-04", "0", makeNumbered("2", 0, 3, 0));
add("2019-01-03/2019-01-04", "0", makeNumbered("2", 1, 3, 0));
// Overwrite 2019-01-01/2019-01-02
add("2019-01-01/2019-01-02", "1", makeNumbered("1", 0, 0));
add("2019-01-01/2019-01-02", "1", makeNumbered("1", 1, 0));
// Overwrite 2019-01-01/2019-01-02
add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3));
add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3));
add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3));
Assert.assertEquals(
ImmutableSet.of(
makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3).getObject(),
makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3).getObject(),
makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3).getObject(),
makeNumbered("0", 0, 0).getObject(),
makeNumbered("0", 1, 0).getObject()
),
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2019-01-01/2019-01-04"), Partitions.ONLY_COMPLETE)
);
}
@Test
public void testFindNonOvershadowedObjectsInIntervalWithIncompleteOkReturningValidResult()
{
// 2019-01-01/2019-01-02
add("2019-01-01/2019-01-02", "0", makeNumbered("0", 0, 0));
add("2019-01-01/2019-01-02", "0", makeNumbered("0", 1, 0));
add("2019-01-01/2019-01-02", "0", makeNumbered("0", 2, 0));
// 2019-01-02/2019-01-03
add("2019-01-02/2019-01-03", "0", makeNumbered("0", 0, 0));
add("2019-01-02/2019-01-03", "0", makeNumbered("0", 1, 0));
// Incomplete partitions
add("2019-01-03/2019-01-04", "0", makeNumbered("2", 0, 3, 0));
add("2019-01-03/2019-01-04", "0", makeNumbered("2", 1, 3, 0));
// Overwrite 2019-01-01/2019-01-02
add("2019-01-01/2019-01-02", "1", makeNumbered("1", 0, 0));
add("2019-01-01/2019-01-02", "1", makeNumbered("1", 1, 0));
// Overwrite 2019-01-01/2019-01-02
add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3));
add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3));
add("2019-01-01/2019-01-02", "1", makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3));
Assert.assertEquals(
ImmutableSet.of(
makeNumberedOverwriting("1", 0, 1, 0, 2, 1, 3).getObject(),
makeNumberedOverwriting("1", 1, 1, 0, 2, 1, 3).getObject(),
makeNumberedOverwriting("1", 2, 1, 0, 2, 1, 3).getObject(),
makeNumbered("0", 0, 0).getObject(),
makeNumbered("0", 1, 0).getObject(),
makeNumbered("2", 0, 3, 0).getObject(),
makeNumbered("2", 1, 3, 0).getObject()
),
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2019-01-01/2019-01-04"), Partitions.INCOMPLETE_OK)
);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.timeline;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -107,7 +108,10 @@ public class VersionedIntervalTimelineTestBase
void checkRemove()
{
for (TimelineObjectHolder<String, OvershadowableInteger> holder : timeline.findFullyOvershadowed()) {
for (PartitionChunk<OvershadowableInteger> chunk : holder.getObject()) {
// Copy chunks to avoid the ConcurrentModificationException.
// Note that timeline.remove() modifies the PartitionHolder.
List<PartitionChunk<OvershadowableInteger>> chunks = FluentIterable.from(holder.getObject()).toList();
for (PartitionChunk<OvershadowableInteger> chunk : chunks) {
timeline.remove(holder.getInterval(), holder.getVersion(), chunk);
}
}
@ -154,26 +158,36 @@ public class VersionedIntervalTimelineTestBase
);
}
PartitionChunk<OvershadowableInteger> makeSingle(String majorVersion, int value)
public static PartitionChunk<OvershadowableInteger> makeSingle(String majorVersion, int value)
{
return makeSingle(majorVersion, 0, value);
}
private PartitionChunk<OvershadowableInteger> makeSingle(String majorVersion, int partitionNum, int val)
public static PartitionChunk<OvershadowableInteger> makeSingle(String majorVersion, int partitionNum, int val)
{
return new SingleElementPartitionChunk<>(new OvershadowableInteger(majorVersion, partitionNum, val));
}
PartitionChunk<OvershadowableInteger> makeNumbered(String majorVersion, int partitionNum, int val)
public static PartitionChunk<OvershadowableInteger> makeNumbered(String majorVersion, int partitionNum, int val)
{
return makeNumbered(majorVersion, partitionNum, 0, val);
}
public static PartitionChunk<OvershadowableInteger> makeNumbered(
String majorVersion,
int partitionNum,
int chunks,
int val
)
{
return new NumberedPartitionChunk<>(
partitionNum,
0,
chunks,
new OvershadowableInteger(majorVersion, partitionNum, val)
);
}
PartitionChunk<OvershadowableInteger> makeNumberedOverwriting(
public static PartitionChunk<OvershadowableInteger> makeNumberedOverwriting(
String majorVersion,
int partitionNumOrdinal,
int val,

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.timeline.VersionedIntervalTimelineTestBase;
import org.junit.Assert;
import org.junit.Test;
import java.util.stream.IntStream;
public class AtomicUpdateGroupTest
{
@Test
public void testCopy()
{
AtomicUpdateGroup<OvershadowableInteger> original = new AtomicUpdateGroup<>(
VersionedIntervalTimelineTestBase.makeNumberedOverwriting(
"0",
0,
0,
0,
10,
0,
10
)
);
IntStream.range(1, 10).forEach(
i -> original.add(
VersionedIntervalTimelineTestBase.makeNumberedOverwriting(
"0",
i,
0,
0,
10,
0,
10
)
)
);
Assert.assertEquals(AtomicUpdateGroup.copy(original), original);
}
@Test
public void testEqualAndHashCodeContract()
{
EqualsVerifier.forClass(AtomicUpdateGroup.class).usingGetClass().verify();
}
}

View File

@ -20,7 +20,9 @@
package org.apache.druid.timeline.partition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.timeline.partition.OvershadowableManager.RootPartitionRange;
import org.apache.druid.timeline.partition.OvershadowableManager.State;
import org.junit.Assert;
@ -63,6 +65,63 @@ public class OvershadowableManagerTest
expectedStandbyChunks = new ArrayList<>();
}
@Test
public void testCopyVisible()
{
// chunks of partition id 0 and 1
manager.addChunk(newRootChunk());
manager.addChunk(newRootChunk());
// chunks to overshadow the partition id range [0, 2)
manager.addChunk(newNonRootChunk(0, 2, 1, 3));
manager.addChunk(newNonRootChunk(0, 2, 1, 3));
manager.addChunk(newNonRootChunk(0, 2, 1, 3));
// chunks of partition id 3 and 4
manager.addChunk(newRootChunk());
manager.addChunk(newRootChunk());
// standby chunk
manager.addChunk(newNonRootChunk(2, 4, 1, 3));
OvershadowableManager<OvershadowableInteger> copy = OvershadowableManager.copyVisible(manager);
Assert.assertTrue(copy.getOvershadowedChunks().isEmpty());
Assert.assertTrue(copy.getStandbyChunks().isEmpty());
Assert.assertEquals(
Lists.newArrayList(manager.visibleChunksIterator()),
Lists.newArrayList(copy.visibleChunksIterator())
);
}
@Test
public void testDeepCopy()
{
// chunks of partition id 0 and 1
manager.addChunk(newRootChunk());
manager.addChunk(newRootChunk());
// chunks to overshadow the partition id range [0, 2)
manager.addChunk(newNonRootChunk(0, 2, 1, 3));
manager.addChunk(newNonRootChunk(0, 2, 1, 3));
manager.addChunk(newNonRootChunk(0, 2, 1, 3));
// chunks of partition id 3 and 4
manager.addChunk(newRootChunk());
manager.addChunk(newRootChunk());
// standby chunk
manager.addChunk(newNonRootChunk(2, 4, 1, 3));
OvershadowableManager<OvershadowableInteger> copy = OvershadowableManager.deepCopy(manager);
Assert.assertEquals(manager, copy);
}
@Test
public void testEqualAndHashCodeContract()
{
EqualsVerifier.forClass(OvershadowableManager.class).usingGetClass().verify();
}
@Test
public void testFindOvershadowedBy()
{
@ -981,7 +1040,7 @@ public class OvershadowableManagerTest
Assert.assertEquals(
"Mismatched visible chunks",
new HashSet<>(expectedVisibleChunks),
Sets.newHashSet(manager.createVisibleChunksStream().iterator())
Sets.newHashSet(manager.visibleChunksIterator())
);
Assert.assertEquals(
"Mismatched overshadowed chunks",

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.FluentIterable;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputRow;
@ -65,6 +66,7 @@ import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
@ -223,11 +225,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
final Set<DataSegment> allSegments = new HashSet<>(getTaskLockHelper().getLockedExistingSegments());
allSegments.addAll(pushedSegments);
final VersionedIntervalTimeline<String, DataSegment> timeline = VersionedIntervalTimeline.forSegments(allSegments);
final Set<DataSegment> oldSegments = timeline.findFullyOvershadowed()
.stream()
.flatMap(holder -> holder.getObject().stream())
.map(PartitionChunk::getObject)
.collect(Collectors.toSet());
final Set<DataSegment> oldSegments = FluentIterable.from(timeline.findFullyOvershadowed())
.transformAndConcat(TimelineObjectHolder::getObject)
.transform(PartitionChunk::getObject)
.toSet();
taskClient.report(supervisorTaskId, new PushedSegmentsReport(getId(), oldSegments, pushedSegments));
return TaskStatus.success(getId());

View File

@ -25,10 +25,10 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
@ -171,15 +171,16 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
final List<TimelineObjectHolder<String, DataSegment>> timeline = createTimeline();
final Stream<InputEntity> entityStream = timeline
.stream()
.flatMap(holder -> {
final Iterator<DruidSegmentInputEntity> entityIterator = FluentIterable
.from(timeline)
.transformAndConcat(holder -> {
//noinspection ConstantConditions
final PartitionHolder<DataSegment> partitionHolder = holder.getObject();
return partitionHolder
.stream()
.map(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
});
//noinspection ConstantConditions
return FluentIterable
.from(partitionHolder)
.transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()));
}).iterator();
final List<String> effectiveDimensions;
if (dimensions == null) {
effectiveDimensions = ReingestionTimelineUtils.getUniqueDimensions(
@ -209,7 +210,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
entityStream.iterator(),
entityIterator,
temporaryDirectory
);
}

View File

@ -168,8 +168,9 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
runTestTask(inputInterval, Granularities.DAY);
final Interval interval = inputInterval == null ? Intervals.ETERNITY : inputInterval;
final Collection<DataSegment> allSegments =
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
final Collection<DataSegment> allSegments = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE)
);
// Reingest the same data. Each segment should get replaced by a segment with a newer version.
runTestTask(inputInterval, secondSegmentGranularity);

View File

@ -19,6 +19,7 @@
package org.apache.druid.tests.indexer;
import com.google.common.collect.FluentIterable;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask;
@ -250,11 +251,13 @@ public abstract class AbstractITBatchIndexTest extends AbstractIndexerTest
);
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(Intervals.ETERNITY);
return holders
.stream()
.flatMap(holder -> holder.getObject().stream())
.anyMatch(chunk -> oldVersions.stream()
.anyMatch(oldSegment -> chunk.getObject().overshadows(oldSegment)));
return FluentIterable
.from(holders)
.transformAndConcat(TimelineObjectHolder::getObject)
.anyMatch(
chunk -> FluentIterable.from(oldVersions)
.anyMatch(oldSegment -> chunk.getObject().overshadows(oldSegment))
);
},
"See a new version"
);

View File

@ -169,5 +169,4 @@ public class DataSourcesSnapshot
}
return overshadowedSegments;
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@ -71,13 +72,11 @@ import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
/**
*/
@ -811,9 +810,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return null;
} else {
if (existingChunks
.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
//noinspection ConstantConditions
if (FluentIterable
.from(existingChunks)
.transformAndConcat(TimelineObjectHolder::getObject)
.anyMatch(chunk -> !chunk.getObject().getShardSpec().isCompatible(partialShardSpec.getShardSpecClass()))) {
// All existing segments should have a compatible shardSpec with partialShardSpec.
return null;
@ -825,15 +825,19 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (!existingChunks.isEmpty()) {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
maxId = StreamSupport
.stream(existingHolder.getObject().spliterator(), false)
//noinspection ConstantConditions
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
// Here we check only the segments of the same shardSpec to find out the max partitionId.
// Note that OverwriteShardSpec has the higher range for partitionId than others.
// See PartitionIds.
.filter(chunk -> chunk.getObject().getShardSpec().getClass() == partialShardSpec.getShardSpecClass())
.max(Comparator.comparing(chunk -> chunk.getObject().getShardSpec().getPartitionNum()))
.map(chunk -> SegmentIdWithShardSpec.fromDataSegment(chunk.getObject()))
.orElse(null);
.filter(segment -> segment.getShardSpec().getClass() == partialShardSpec.getShardSpecClass())) {
// Don't use the stream API for performance.
if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
}
final List<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(

View File

@ -22,10 +22,12 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Maps;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
@ -54,7 +56,6 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* This class iterates all segments of the dataSources configured for compaction from the newest to the oldest.
@ -119,13 +120,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(interval);
resultMap.put(
entry.getDataSource(),
holders.stream()
.flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
.mapToLong(chunk -> chunk.getObject().getSize())
.sum()
);
long size = 0;
for (DataSegment segment : FluentIterable
.from(holders)
.transformAndConcat(TimelineObjectHolder::getObject)
.transform(PartitionChunk::getObject)) {
size += segment.getSize();
}
resultMap.put(entry.getDataSource(), size);
}
return resultMap;
}
@ -237,11 +240,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (holders.isEmpty()) {
throw new NoSuchElementException();
}
return holders.remove(holders.size() - 1)
.getObject()
.stream()
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
return FluentIterable.from(holders.remove(holders.size() - 1).getObject())
.transform(PartitionChunk::getObject)
.toList();
}
}
@ -509,11 +510,16 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private QueueEntry(List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && !segments.isEmpty());
Collections.sort(segments);
this.interval = new Interval(
segments.get(0).getInterval().getStart(),
segments.get(segments.size() - 1).getInterval().getEnd()
);
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
this.interval = new Interval(minStart, maxEnd);
this.segments = segments;
}

View File

@ -30,6 +30,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
@ -44,6 +45,7 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
@ -74,11 +76,16 @@ public class CompactSegmentsTest
)
{
Preconditions.checkArgument(segments.size() > 1);
Collections.sort(segments);
Interval compactInterval = new Interval(
segments.get(0).getInterval().getStart(),
segments.get(segments.size() - 1).getInterval().getEnd()
);
DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
for (DataSegment segment : segments) {
if (segment.getInterval().getStart().compareTo(minStart) < 0) {
minStart = segment.getInterval().getStart();
}
if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
maxEnd = segment.getInterval().getEnd();
}
}
Interval compactInterval = new Interval(minStart, maxEnd);
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(segments.get(0).getDataSource());
segments.forEach(
segment -> timeline.remove(