Fix bugs in overshadowableManager and add unit tests (#8222)

* Fix bugs in overshadowableManager and add unit tests

* Fix SegmentManager

* add segment manager test

* Address comments

* Address comments
This commit is contained in:
Jihoon Son 2019-08-07 13:51:21 -07:00 committed by Jonathan Wei
parent 172ebba4b8
commit 8fa114c349
15 changed files with 1836 additions and 287 deletions

View File

@ -45,7 +45,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
// This may matter if there are a lot of segments to keep in memory as in brokers or the coordinator.
private final List<PartitionChunk<T>> chunks = new ArrayList<>();
public AtomicUpdateGroup(PartitionChunk<T> chunk)
AtomicUpdateGroup(PartitionChunk<T> chunk)
{
this.chunks.add(chunk);
}
@ -60,7 +60,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
}
for (PartitionChunk<T> existing : chunks) {
if (existing.equals(chunk)) {
return;
throw new ISE("Can't add same chunk[%s] again", chunk);
}
}
chunks.add(chunk);
@ -95,7 +95,7 @@ class AtomicUpdateGroup<T extends Overshadowable<T>> implements Overshadowable<A
}
@Nullable
public PartitionChunk<T> findChunk(int partitionId)
PartitionChunk<T> findChunk(int partitionId)
{
return chunks.stream().filter(chunk -> chunk.getChunkNumber() == partitionId).findFirst().orElse(null);
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.timeline.partition;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.objects.AbstractObjectCollection;
@ -41,6 +42,7 @@ import org.apache.druid.timeline.Overshadowable;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -48,11 +50,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
/**
* OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details of
* the possible state.
* OvershadowableManager manages the state of {@link AtomicUpdateGroup}. See the below {@link State} for details.
* Note that an AtomicUpdateGroup can consist of {@link Overshadowable}s of the same majorVersion, minorVersion,
* rootPartition range, and atomicUpdateGroupSize.
* In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class is used to manage segments in the same
@ -62,11 +66,22 @@ import java.util.TreeMap;
*/
class OvershadowableManager<T extends Overshadowable<T>>
{
private enum State
/**
* There are 3 states for atomicUpdateGroups.
* There could be at most one visible atomicUpdateGroup at any time in a non-empty overshadowableManager.
*
* - Visible: fully available atomicUpdateGroup of the highest version if any.
* If there's no fully available atomicUpdateGroup, the standby atomicUpdateGroup of the highest version
* becomes visible.
* - Standby: all atomicUpdateGroups of higher versions than that of the visible atomicUpdateGroup.
* - Overshadowed: all atomicUpdateGroups of lower versions than that of the visible atomicUpdateGroup.
*/
@VisibleForTesting
enum State
{
STANDBY, // have atomicUpdateGroup of higher versions than visible
VISIBLE, // have a single fully available atomicUpdateGroup of highest version
OVERSHADOWED // have atomicUpdateGroup of lower versions than visible
STANDBY,
VISIBLE,
OVERSHADOWED
}
private final Map<Integer, PartitionChunk<T>> knownPartitionChunks; // served segments
@ -92,6 +107,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
this.overshadowedGroups = new TreeMap<>(other.overshadowedGroups);
}
private OvershadowableManager(List<AtomicUpdateGroup<T>> groups)
{
this();
for (AtomicUpdateGroup<T> entry : groups) {
for (PartitionChunk<T> chunk : entry.getChunks()) {
addChunk(chunk);
}
}
}
private TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> getStateMap(State state)
{
switch (state) {
@ -125,7 +150,39 @@ class OvershadowableManager<T extends Overshadowable<T>>
Preconditions.checkArgument(!atomicUpdateGroup.isEmpty(), "empty atomicUpdateGroup");
removeFrom(atomicUpdateGroup, from);
addAtomicUpdateGroupWithState(atomicUpdateGroup, to);
addAtomicUpdateGroupWithState(atomicUpdateGroup, to, false);
}
/**
* Replace the oldVisibleGroups with the newVisibleGroups.
* This method first removes the oldVisibleGroups from the visibles map,
* moves the newVisibleGroups from its old state map to the visibles map,
* and finally add the oldVisibleGroups to its new state map.
*/
private void replaceVisibleWith(
Collection<AtomicUpdateGroup<T>> oldVisibleGroups,
State newStateOfOldVisibleGroup,
List<AtomicUpdateGroup<T>> newVisibleGroups,
State oldStateOfNewVisibleGroups
)
{
oldVisibleGroups.forEach(
group -> {
if (!group.isEmpty()) {
removeFrom(group, State.VISIBLE);
}
}
);
newVisibleGroups.forEach(
entry -> transitAtomicUpdateGroupState(entry, oldStateOfNewVisibleGroups, State.VISIBLE)
);
oldVisibleGroups.forEach(
group -> {
if (!group.isEmpty()) {
addAtomicUpdateGroupWithState(group, newStateOfOldVisibleGroup, false);
}
}
);
}
/**
@ -179,7 +236,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
return null;
}
private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
private List<AtomicUpdateGroup<T>> findOvershadowedBy(
AtomicUpdateGroup<T> aug,
State fromState
)
@ -189,89 +246,372 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
/**
* Find all atomicUpdateGroups of the given state overshadowed by the given rootPartitionRange and minorVersion.
* Find all atomicUpdateGroups of the given state overshadowed by the minorVersion in the given rootPartitionRange.
* The atomicUpdateGroup of a higher minorVersion can have a wider RootPartitionRange.
* To find all atomicUpdateGroups overshadowed by the given rootPartitionRange and minorVersion,
* we first need to find the first key contained by the given rootPartitionRange.
* Once we find such key, then we go through the entire map until we see an atomicUpdateGroup of which
* rootRangePartition is not contained by the given rootPartitionRange.
*
* @param rangeOfAug the partition range to search for overshadowed groups.
* @param minorVersion the minor version to check overshadow relation. The found groups will have lower minor versions
* than this.
* @param fromState the state to search for overshadowed groups.
*
* @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
*/
private List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> findOvershadowedBy(
RootPartitionRange rangeOfAug,
short minorVersion,
State fromState
)
@VisibleForTesting
List<AtomicUpdateGroup<T>> findOvershadowedBy(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
{
final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = stateMap.floorEntry(rangeOfAug);
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
rangeOfAug,
stateMap,
true
);
if (current == null) {
return Collections.emptyList();
}
// Find the first key for searching for overshadowed atomicUpdateGroup
while (true) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
current.getKey()
);
if (lowerEntry != null && lowerEntry.getKey().startPartitionId == rangeOfAug.startPartitionId) {
current = lowerEntry;
} else {
break;
}
}
// Going through the map to find all entries of the RootPartitionRange contained by the given rangeOfAug.
// Note that RootPartitionRange of entries are always consecutive.
final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> found = new ArrayList<>();
while (current != null && rangeOfAug.contains(current.getKey())) {
// versionToGroup is sorted by minorVersion.
// versionToGroup.subMap(firstKey, minorVersion) below returns a map containing all entries of lower minorVersions
// than the given minorVersion.
final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
// Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
// Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
// See AbstractObjectCollection.toArray().
// If you see performance degradation here, probably we need to improve the below line.
found.addAll(versionToGroup.subMap(versionToGroup.firstShortKey(), minorVersion).short2ObjectEntrySet());
final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
while (current != null && rangeOfAug.overlaps(current.getKey())) {
if (rangeOfAug.contains(current.getKey())) {
// versionToGroup is sorted by minorVersion.
// versionToGroup.headMap(minorVersion) below returns a map containing all entries of lower minorVersions
// than the given minorVersion.
final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
// Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
// Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
// See AbstractObjectCollection.toArray().
// If you see performance degradation here, probably we need to improve the below line.
if (versionToGroup.firstShortKey() < minorVersion) {
found.addAll(versionToGroup.headMap(minorVersion).values());
}
}
current = stateMap.higherEntry(current.getKey());
}
return found;
}
private List<AtomicUpdateGroup<T>> findOvershadows(AtomicUpdateGroup<T> aug, State fromState)
{
return findOvershadows(RootPartitionRange.of(aug), aug.getMinorVersion(), fromState);
}
/**
* Find all atomicUpdateGroups which overshadow others of the given minorVersion in the given rootPartitionRange.
* Similar to {@link #findOvershadowedBy}.
*
* Note that one atomicUpdateGroup can overshadow multiple other groups. If you're finding overshadowing
* atomicUpdateGroups by calling this method in a loop, the results of this method can contain duplicate groups.
*
* @param rangeOfAug the partition range to search for overshadowing groups.
* @param minorVersion the minor version to check overshadow relation. The found groups will have higher minor
* versions than this.
* @param fromState the state to search for overshadowed groups.
*
* @return a list of found atomicUpdateGroups. It could be empty if no groups are found.
*/
@VisibleForTesting
List<AtomicUpdateGroup<T>> findOvershadows(RootPartitionRange rangeOfAug, short minorVersion, State fromState)
{
final TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap = getStateMap(fromState);
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = findLowestOverlappingEntry(
rangeOfAug,
stateMap,
false
);
if (current == null) {
return Collections.emptyList();
}
// Going through the map to find all entries of the RootPartitionRange contains the given rangeOfAug.
// Note that RootPartitionRange of entries are always consecutive.
final List<AtomicUpdateGroup<T>> found = new ArrayList<>();
while (current != null && current.getKey().overlaps(rangeOfAug)) {
if (current.getKey().contains(rangeOfAug)) {
// versionToGroup is sorted by minorVersion.
// versionToGroup.tailMap(minorVersion) below returns a map containing all entries of equal to or higher
// minorVersions than the given minorVersion.
final Short2ObjectSortedMap<AtomicUpdateGroup<T>> versionToGroup = current.getValue();
// Short2ObjectRBTreeMap.SubMap.short2ObjectEntrySet() implementation, especially size(), is not optimized.
// Note that size() is indirectly called in ArrayList.addAll() when ObjectSortedSet.toArray() is called.
// See AbstractObjectCollection.toArray().
// If you see performance degradation here, probably we need to improve the below line.
if (versionToGroup.lastShortKey() > minorVersion) {
found.addAll(versionToGroup.tailMap(minorVersion).values());
}
}
current = stateMap.higherEntry(current.getKey());
}
return found;
}
/**
* Handles addition of the atomicUpdateGroup to the given state
* Finds the lowest entry overlapping with the given root partition range.
* It first searches the entries lower than or equal to the given range.
* If there's no such entry lower than the given range, then it searches the entries higher than the given range.
*
* @return an entry of the lowest key overlapping with the given range. Otherwise null.
*/
private void transitionStandbyGroupIfFull(AtomicUpdateGroup<T> aug, State stateOfAug)
@Nullable
private Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> findLowestOverlappingEntry(
RootPartitionRange rangeOfAug,
TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap,
boolean strictSameStartId
)
{
// Searches the entries lower than or equal to the given range.
Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> current = stateMap.floorEntry(rangeOfAug);
if (current == null) {
// Searches the entries higher than then given range.
current = stateMap.higherEntry(rangeOfAug);
}
if (current == null) {
return null;
}
// floorEntry() can return the greatest key less than rangeOfAug. We need to skip non-overlapping keys.
while (current != null && !current.getKey().overlaps(rangeOfAug)) {
current = stateMap.higherEntry(current.getKey());
}
final BiPredicate<RootPartitionRange, RootPartitionRange> predicate;
if (strictSameStartId) {
predicate = (entryRange, groupRange) -> entryRange.startPartitionId == groupRange.startPartitionId;
} else {
predicate = RootPartitionRange::overlaps;
}
// There could be multiple entries of the same startPartitionId but different endPartitionId.
// Find the first key of the same startPartitionId which has the lowest endPartitionId.
while (current != null) {
final Entry<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> lowerEntry = stateMap.lowerEntry(
current.getKey()
);
if (lowerEntry != null && predicate.test(lowerEntry.getKey(), rangeOfAug)) {
current = lowerEntry;
} else {
break;
}
}
return current;
}
/**
* Determine the visible group after a new chunk is added.
*/
private void determineVisibleGroupAfterAdd(AtomicUpdateGroup<T> aug, State stateOfAug)
{
if (stateOfAug == State.STANDBY) {
// A standby atomicUpdateGroup becomes visible when its all segments are available.
if (aug.isFull()) {
// A visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup becomes
// visible which overshadows the current visible one.
findOvershadowedBy(aug, State.VISIBLE)
.forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.VISIBLE, State.OVERSHADOWED));
findOvershadowedBy(aug, State.STANDBY)
.forEach(entry -> transitAtomicUpdateGroupState(entry.getValue(), State.STANDBY, State.OVERSHADOWED));
transitAtomicUpdateGroupState(aug, State.STANDBY, State.VISIBLE);
moveNewStandbyToVisibleIfNecessary(aug, stateOfAug);
} else if (stateOfAug == State.OVERSHADOWED) {
checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(aug, stateOfAug);
}
}
/**
* This method is called in {@link #determineVisibleGroupAfterAdd}.
* The given standby group can be visible in the below two cases:
*
* - The standby group is full. Since every standby group has a higher version than the current visible group,
* it should become visible immediately when it's full.
* - The standby group is not full but not empty and the current visible is not full. If there's no fully available
* group, the group of the highest version should be the visible.
*/
private void moveNewStandbyToVisibleIfNecessary(AtomicUpdateGroup<T> standbyGroup, State stateOfGroup)
{
assert stateOfGroup == State.STANDBY;
// A standby atomicUpdateGroup becomes visible when its all segments are available.
if (standbyGroup.isFull()) {
// A current visible atomicUpdateGroup becomes overshadowed when a fully available standby atomicUpdateGroup
// becomes visible.
replaceVisibleWith(
findOvershadowedBy(standbyGroup, State.VISIBLE),
State.OVERSHADOWED,
Collections.singletonList(standbyGroup),
State.STANDBY
);
findOvershadowedBy(standbyGroup, State.STANDBY)
.forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
} else {
// The given atomicUpdateGroup is in the standby state which means it's not overshadowed by the visible group.
// If the visible group is not fully available, then the new standby group should be visible since it has a
// higher minor version.
if (!standbyGroup.isEmpty()) {
// Check there are visible atomicUpdateGroups overshadowed by the given atomicUpdateGroup.
final List<AtomicUpdateGroup<T>> overshadowedVisibles = findOvershadowedBy(
standbyGroup,
State.VISIBLE
);
if (overshadowedVisibles.isEmpty()) {
// There is no visible atomicUpdateGroup for the rootPartitionRange of the given aug.
// The given aug should be visible.
transitAtomicUpdateGroupState(standbyGroup, State.STANDBY, State.VISIBLE);
findOvershadowedBy(standbyGroup, State.STANDBY)
.forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
} else {
// Check there is any missing chunk in the current visible groups.
// If the current visible groups don't cover the partitino range of the given standby group,
// the given standby group should be visible.
final boolean fullyCoverAugRange = doGroupsFullyCoverPartitionRange(
overshadowedVisibles,
standbyGroup.getStartRootPartitionId(),
standbyGroup.getEndRootPartitionId()
);
if (!fullyCoverAugRange) {
replaceVisibleWith(
overshadowedVisibles,
State.OVERSHADOWED,
Collections.singletonList(standbyGroup),
State.STANDBY
);
findOvershadowedBy(standbyGroup, State.STANDBY)
.forEach(entry -> transitAtomicUpdateGroupState(entry, State.STANDBY, State.OVERSHADOWED));
}
// If all visible atomicUpdateGroups are full, then the given atomicUpdateGroup should stay in the standby
// state.
}
}
}
}
private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state)
/**
* This method is called in {@link #determineVisibleGroupAfterAdd}. It first checks the current visible group is
* fully available. If not, it checks there are overshadowed groups which can cover the rootPartitionRange of
* the visible groups and are fully available. If it finds such groups, they become visible.
*/
private void checkVisibleIsFullyAvailableAndTryToMoveOvershadowedToVisible(
AtomicUpdateGroup<T> group,
State stateOfGroup
)
{
assert stateOfGroup == State.OVERSHADOWED;
if (group.isFull()) {
// Since this atomicUpdateGroup is full, it could be changed to visible if the current visible group is not
// fully available. To check this, we first check the current visible is fully available.
// And if not, we check the overshadowed groups are fully available and can cover the partition range of
// the atomicUpdateGroups overshadow the given overshadowed group.
// Visible or standby groups overshadowing the given group.
// Used to both 1) check fully available visible group and
// 2) get the partition range which the fully available overshadowed groups should cover to become visible.
final List<AtomicUpdateGroup<T>> groupsOvershadowingAug;
final boolean isOvershadowingGroupsFull;
final List<AtomicUpdateGroup<T>> overshadowingVisibles = findOvershadows(group, State.VISIBLE);
if (overshadowingVisibles.isEmpty()) {
final List<AtomicUpdateGroup<T>> overshadowingStandbys = findLatestNonFullyAvailableAtomicUpdateGroups(
findOvershadows(group, State.STANDBY)
);
if (overshadowingStandbys.isEmpty()) {
throw new ISE("WTH? atomicUpdateGroup[%s] is in overshadowed state, but no one overshadows it?", group);
}
groupsOvershadowingAug = overshadowingStandbys;
isOvershadowingGroupsFull = false;
} else {
groupsOvershadowingAug = overshadowingVisibles;
isOvershadowingGroupsFull = doGroupsFullyCoverPartitionRange(
groupsOvershadowingAug,
groupsOvershadowingAug.get(0).getStartRootPartitionId(),
groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId()
);
}
// If groupsOvershadowingAug is the standby groups, isOvershadowingGroupsFull is always false.
// If groupsOvershadowingAug is the visible groups, isOvershadowingGroupsFull indicates the visible group is
// fully available or not.
if (!isOvershadowingGroupsFull) {
// Let's check the overshadowed groups can cover the partition range of groupsOvershadowingAug
// and are fully available.
final List<AtomicUpdateGroup<T>> latestFullGroups = groupsOvershadowingAug
.stream()
.flatMap(eachFullgroup -> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
RootPartitionRange.of(eachFullgroup),
eachFullgroup.getMinorVersion()).stream()
)
.collect(Collectors.toList());
if (!latestFullGroups.isEmpty()) {
final boolean isOvershadowedGroupsFull = doGroupsFullyCoverPartitionRange(
latestFullGroups,
groupsOvershadowingAug.get(0).getStartRootPartitionId(),
groupsOvershadowingAug.get(groupsOvershadowingAug.size() - 1).getEndRootPartitionId()
);
if (isOvershadowedGroupsFull) {
replaceVisibleWith(overshadowingVisibles, State.STANDBY, latestFullGroups, State.OVERSHADOWED);
}
}
}
}
}
/**
* Checks if the given groups fully cover the given partition range. To fully cover the range, the given groups
* should satisfy the below:
*
* - All groups must be full.
* - All groups must be adjacent.
* - The lowest startPartitionId and the highest endPartitionId must be same with the given startPartitionId and
* the given endPartitionId, respectively.
*
* @param groups atomicUpdateGroups sorted by their rootPartitionRange
* @param startRootPartitionId the start partitionId of the root partition range to check the coverage
* @param endRootPartitionId the end partitionId of the root partition range to check the coverage
*
* @return true if the given groups fully cover the given partition range.
*/
private boolean doGroupsFullyCoverPartitionRange(
List<AtomicUpdateGroup<T>> groups,
int startRootPartitionId,
int endRootPartitionId
)
{
final int startRootPartitionIdOfOvershadowed = groups.get(0).getStartRootPartitionId();
final int endRootPartitionIdOfOvershadowed = groups.get(groups.size() - 1).getEndRootPartitionId();
if (startRootPartitionId != startRootPartitionIdOfOvershadowed
|| endRootPartitionId != endRootPartitionIdOfOvershadowed) {
return false;
} else {
int prevEndPartitionId = groups.get(0).getStartRootPartitionId();
for (AtomicUpdateGroup<T> group : groups) {
if (!group.isFull() || prevEndPartitionId != group.getStartRootPartitionId()) {
// If any visible atomicUpdateGroup overshadowed by the given standby atomicUpdateGroup is not full,
// then the given atomicUpdateGroup should be visible since it has a higher version.
return false;
}
prevEndPartitionId = group.getEndRootPartitionId();
}
}
return true;
}
private void addAtomicUpdateGroupWithState(AtomicUpdateGroup<T> aug, State state, boolean determineVisible)
{
final AtomicUpdateGroup<T> existing = getStateMap(state)
.computeIfAbsent(RootPartitionRange.of(aug), k -> createMinorVersionToAugMap(state))
.put(aug.getMinorVersion(), aug);
if (existing != null) {
throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", aug, state);
throw new ISE("AtomicUpdateGroup[%s] is already in state[%s]", existing, state);
}
transitionStandbyGroupIfFull(aug, state);
if (determineVisible) {
determineVisibleGroupAfterAdd(aug, state);
}
}
public boolean addChunk(PartitionChunk<T> chunk)
boolean addChunk(PartitionChunk<T> chunk)
{
// Sanity check. ExistingChunk should be usually null.
final PartitionChunk<T> existingChunk = knownPartitionChunks.put(chunk.getChunkNumber(), chunk);
@ -295,17 +635,36 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (atomicUpdateGroup != null) {
atomicUpdateGroup.add(chunk);
// If overshadowed atomicUpdateGroup is full and visible atomicUpdateGroup is not full,
// move overshadowed one to visible.
determineVisibleGroupAfterAdd(atomicUpdateGroup, State.OVERSHADOWED);
} else {
atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.STANDBY);
if (atomicUpdateGroup != null) {
atomicUpdateGroup.add(chunk);
transitionStandbyGroupIfFull(atomicUpdateGroup, State.STANDBY);
determineVisibleGroupAfterAdd(atomicUpdateGroup, State.STANDBY);
} else {
atomicUpdateGroup = findAtomicUpdateGroupWith(chunk, State.VISIBLE);
if (atomicUpdateGroup != null) {
atomicUpdateGroup.add(chunk);
if (atomicUpdateGroup.findChunk(chunk.getChunkNumber()) == null) {
// If this chunk is not in the atomicUpdateGroup, then we add the chunk to it if it's not full.
if (!atomicUpdateGroup.isFull()) {
atomicUpdateGroup.add(chunk);
} else {
throw new ISE("Can't add chunk[%s] to a full atomicUpdateGroup[%s]", chunk, atomicUpdateGroup);
}
} else {
// If this chunk is already in the atomicUpdateGroup, it should be in knownPartitionChunks
// and this code must not be executed.
throw new ISE(
"WTH? chunk[%s] is in the atomicUpdateGroup[%s] but not in knownPartitionChunks[%s]?",
chunk,
atomicUpdateGroup,
knownPartitionChunks
);
}
} else {
final AtomicUpdateGroup<T> newAtomicUpdateGroup = new AtomicUpdateGroup<>(chunk);
@ -317,9 +676,9 @@ class OvershadowableManager<T extends Overshadowable<T>>
.anyMatch(group -> group.overshadows(newAtomicUpdateGroup));
if (overshadowed) {
addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED);
addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.OVERSHADOWED, true);
} else {
addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY);
addAtomicUpdateGroupWithState(newAtomicUpdateGroup, State.STANDBY, true);
}
}
}
@ -328,7 +687,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
/**
* Handles of removal of an empty atomicUpdateGroup from a state.
* Handles the removal of an empty atomicUpdateGroup from a state.
*/
private void determineVisibleGroupAfterRemove(
AtomicUpdateGroup<T> augOfRemovedChunk,
@ -343,33 +702,101 @@ class OvershadowableManager<T extends Overshadowable<T>>
// is removed.
if (stateOfRemovedAug == State.VISIBLE) {
// All segments in the visible atomicUpdateGroup which overshadows this atomicUpdateGroup is removed.
// Fall back if there is a fully available overshadowed atomicUpdateGroup
// A chunk is removed from the current visible group.
// Fall back to
// 1) the latest fully available overshadowed group if any
// 2) the latest standby group if any
// 3) the latest overshadowed group if any
final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroup(
// Check there is a fully available latest overshadowed atomicUpdateGroup.
final List<AtomicUpdateGroup<T>> latestFullAugs = findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
rangeOfAug,
minorVersion
);
// If there is no fully available fallback group, then the existing VISIBLE group remains VISIBLE.
// Otherwise, the latest fully available group becomes VISIBLE.
// If there are fully available overshadowed groups, then the latest one becomes visible.
if (!latestFullAugs.isEmpty()) {
// Move the atomicUpdateGroup to standby
// and move the fully available overshadowed atomicUpdateGroup to visible
if (!augOfRemovedChunk.isEmpty()) {
transitAtomicUpdateGroupState(augOfRemovedChunk, State.VISIBLE, State.STANDBY);
// The current visible atomicUpdateGroup becomes standby
// and the fully available overshadowed atomicUpdateGroups become visible
final Set<AtomicUpdateGroup<T>> overshadowsLatestFullAugsInVisible = latestFullAugs
.stream()
.flatMap(group -> findOvershadows(group, State.VISIBLE).stream())
.collect(Collectors.toSet());
replaceVisibleWith(
overshadowsLatestFullAugsInVisible,
State.STANDBY,
latestFullAugs,
State.OVERSHADOWED
);
latestFullAugs
.stream()
.flatMap(group -> findOvershadows(group, State.OVERSHADOWED).stream())
.collect(Collectors.toSet())
.forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.STANDBY));
} else {
// Find the latest non-fully available atomicUpdateGroups
final List<AtomicUpdateGroup<T>> latestStandby = findLatestNonFullyAvailableAtomicUpdateGroups(
findOvershadows(rangeOfAug, minorVersion, State.STANDBY)
);
if (!latestStandby.isEmpty()) {
final List<AtomicUpdateGroup<T>> overshadowedByLatestStandby = latestStandby
.stream()
.flatMap(group -> findOvershadowedBy(group, State.VISIBLE).stream())
.collect(Collectors.toList());
replaceVisibleWith(overshadowedByLatestStandby, State.OVERSHADOWED, latestStandby, State.STANDBY);
// All standby groups overshadowed by the new visible group should be moved to overshadowed
latestStandby
.stream()
.flatMap(group -> findOvershadowedBy(group, State.STANDBY).stream())
.collect(Collectors.toSet())
.forEach(aug -> transitAtomicUpdateGroupState(aug, State.STANDBY, State.OVERSHADOWED));
} else if (augOfRemovedChunk.isEmpty()) {
// Visible is empty. Move the latest overshadowed to visible.
final List<AtomicUpdateGroup<T>> latestOvershadowed = findLatestNonFullyAvailableAtomicUpdateGroups(
findOvershadowedBy(rangeOfAug, minorVersion, State.OVERSHADOWED)
);
if (!latestOvershadowed.isEmpty()) {
latestOvershadowed.forEach(aug -> transitAtomicUpdateGroupState(aug, State.OVERSHADOWED, State.VISIBLE));
}
}
latestFullAugs.forEach(group -> transitAtomicUpdateGroupState(group, State.OVERSHADOWED, State.VISIBLE));
}
}
}
private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroup(
/**
* Find the latest NON-FULLY available atomicUpdateGroups from the given groups.
*
* This method MUST be called only when there is no fully available ones in the given groups. If the given groups
* are in the overshadowed state, calls {@link #findLatestFullyAvailableOvershadowedAtomicUpdateGroups} first
* to check there is any fully available group.
* If the given groups are in the standby state, you can freely call this method because there should be no fully
* available one in the standby groups at any time.
*/
private List<AtomicUpdateGroup<T>> findLatestNonFullyAvailableAtomicUpdateGroups(List<AtomicUpdateGroup<T>> groups)
{
if (groups.isEmpty()) {
return Collections.emptyList();
}
final OvershadowableManager<T> manager = new OvershadowableManager<>(groups);
if (!manager.standbyGroups.isEmpty()) {
throw new ISE("This method should be called only when there is no fully available group in the given state.");
}
final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroup.values()) {
visibles.addAll(map.values());
}
return visibles;
}
private List<AtomicUpdateGroup<T>> findLatestFullyAvailableOvershadowedAtomicUpdateGroups(
RootPartitionRange rangeOfAug,
short minorVersion
)
{
final List<Short2ObjectMap.Entry<AtomicUpdateGroup<T>>> overshadowedGroups = findOvershadowedBy(
final List<AtomicUpdateGroup<T>> overshadowedGroups = findOvershadowedBy(
rangeOfAug,
minorVersion,
State.OVERSHADOWED
@ -378,16 +805,22 @@ class OvershadowableManager<T extends Overshadowable<T>>
return Collections.emptyList();
}
final OvershadowableManager<T> manager = new OvershadowableManager<>();
for (Short2ObjectMap.Entry<AtomicUpdateGroup<T>> entry : overshadowedGroups) {
for (PartitionChunk<T> chunk : entry.getValue().getChunks()) {
manager.addChunk(chunk);
}
}
final OvershadowableManager<T> manager = new OvershadowableManager<>(overshadowedGroups);
final List<AtomicUpdateGroup<T>> visibles = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> map : manager.visibleGroup.values()) {
visibles.addAll(map.values());
for (AtomicUpdateGroup<T> atomicUpdateGroup : map.values()) {
if (!atomicUpdateGroup.isFull()) {
return Collections.emptyList();
}
visibles.add(atomicUpdateGroup);
}
}
final RootPartitionRange foundRange = RootPartitionRange.of(
visibles.get(0).getStartRootPartitionId(),
visibles.get(visibles.size() - 1).getEndRootPartitionId()
);
if (!rangeOfAug.equals(foundRange)) {
return Collections.emptyList();
}
return visibles;
}
@ -419,7 +852,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
@Nullable
public PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
{
final PartitionChunk<T> knownChunk = knownPartitionChunks.get(partitionChunk.getChunkNumber());
if (knownChunk == null) {
@ -461,7 +894,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
@Nullable
public PartitionChunk<T> getChunk(int partitionId)
PartitionChunk<T> getChunk(int partitionId)
{
final PartitionChunk<T> chunk = knownPartitionChunks.get(partitionId);
if (chunk == null) {
@ -480,26 +913,33 @@ class OvershadowableManager<T extends Overshadowable<T>>
}
}
public List<PartitionChunk<T>> getVisibles()
List<PartitionChunk<T>> getVisibleChunks()
{
final List<PartitionChunk<T>> visibles = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : visibleGroup.values()) {
for (AtomicUpdateGroup<T> aug : treeMap.values()) {
visibles.addAll(aug.getChunks());
}
}
return visibles;
return getAllChunks(visibleGroup);
}
public List<PartitionChunk<T>> getOvershadowed()
List<PartitionChunk<T>> getOvershadowedChunks()
{
final List<PartitionChunk<T>> overshadowed = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : overshadowedGroups.values()) {
return getAllChunks(overshadowedGroups);
}
@VisibleForTesting
List<PartitionChunk<T>> getStandbyChunks()
{
return getAllChunks(standbyGroups);
}
private List<PartitionChunk<T>> getAllChunks(
TreeMap<RootPartitionRange, Short2ObjectSortedMap<AtomicUpdateGroup<T>>> stateMap
)
{
final List<PartitionChunk<T>> allChunks = new ArrayList<>();
for (Short2ObjectSortedMap<AtomicUpdateGroup<T>> treeMap : stateMap.values()) {
for (AtomicUpdateGroup<T> aug : treeMap.values()) {
overshadowed.addAll(aug.getChunks());
allChunks.addAll(aug.getChunks());
}
}
return overshadowed;
return allChunks;
}
@Override
@ -535,11 +975,18 @@ class OvershadowableManager<T extends Overshadowable<T>>
'}';
}
private static class RootPartitionRange implements Comparable<RootPartitionRange>
@VisibleForTesting
static class RootPartitionRange implements Comparable<RootPartitionRange>
{
private final short startPartitionId;
private final short endPartitionId;
@VisibleForTesting
static RootPartitionRange of(int startPartitionId, int endPartitionId)
{
return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
}
private static <T extends Overshadowable<T>> RootPartitionRange of(PartitionChunk<T> chunk)
{
return of(chunk.getObject().getStartRootPartitionId(), chunk.getObject().getEndRootPartitionId());
@ -550,11 +997,6 @@ class OvershadowableManager<T extends Overshadowable<T>>
return of(aug.getStartRootPartitionId(), aug.getEndRootPartitionId());
}
private static RootPartitionRange of(int startPartitionId, int endPartitionId)
{
return new RootPartitionRange((short) startPartitionId, (short) endPartitionId);
}
private RootPartitionRange(short startPartitionId, short endPartitionId)
{
this.startPartitionId = startPartitionId;
@ -563,7 +1005,16 @@ class OvershadowableManager<T extends Overshadowable<T>>
public boolean contains(RootPartitionRange that)
{
return this.startPartitionId <= that.startPartitionId && this.endPartitionId >= that.endPartitionId;
return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId)
&& Short.toUnsignedInt(this.endPartitionId) >= Short.toUnsignedInt(that.endPartitionId);
}
public boolean overlaps(RootPartitionRange that)
{
return Short.toUnsignedInt(startPartitionId) <= Short.toUnsignedInt(that.startPartitionId)
&& Short.toUnsignedInt(endPartitionId) > Short.toUnsignedInt(that.startPartitionId)
|| Short.toUnsignedInt(startPartitionId) >= Short.toUnsignedInt(that.startPartitionId)
&& Short.toUnsignedInt(startPartitionId) < Short.toUnsignedInt(that.endPartitionId);
}
@Override
@ -638,7 +1089,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (toKey > key) {
return this;
} else {
throw new IllegalArgumentException();
throw new IAE("toKey: %s, key: %s", toKey, key);
}
}
@ -648,7 +1099,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
if (fromKey <= key) {
return this;
} else {
throw new IllegalArgumentException();
throw new IAE("fromKey: %s, key: %s", fromKey, key);
}
}

View File

@ -113,13 +113,13 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
@Override
public Iterator<PartitionChunk<T>> iterator()
{
return overshadowableManager.getVisibles().iterator();
return overshadowableManager.getVisibleChunks().iterator();
}
@Override
public Spliterator<PartitionChunk<T>> spliterator()
{
return overshadowableManager.getVisibles().spliterator();
return overshadowableManager.getVisibleChunks().spliterator();
}
public Stream<PartitionChunk<T>> stream()
@ -129,7 +129,7 @@ public class PartitionHolder<T extends Overshadowable<T>> implements Iterable<Pa
public List<PartitionChunk<T>> getOvershadowed()
{
return overshadowableManager.getOvershadowed();
return overshadowableManager.getOvershadowedChunks();
}
public Iterable<T> payloads()

View File

@ -32,6 +32,7 @@ import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.NumberedOverwritingPartitionChunk;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.PartitionIds;
@ -48,7 +49,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
@ -2145,116 +2145,4 @@ public class VersionedIntervalTimelineTest
{
return new VersionedIntervalTimeline<>(Ordering.natural());
}
private static class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
{
private final String majorVersion;
private final int partitionNum;
private final int val;
private final int startRootPartitionId;
private final int endRootPartitionId;
private final short minorVersion;
private final short atomicUpdateGroupSize;
private OvershadowableInteger(String majorVersion, int partitionNum, int val)
{
this(majorVersion, partitionNum, val, partitionNum, partitionNum + 1, 0, 1);
}
private OvershadowableInteger(
String majorVersion,
int partitionNum,
int val,
int startRootPartitionId,
int endRootPartitionId,
int minorVersion,
int atomicUpdateGroupSize
)
{
this.majorVersion = majorVersion;
this.partitionNum = partitionNum;
this.val = val;
this.startRootPartitionId = startRootPartitionId;
this.endRootPartitionId = endRootPartitionId;
this.minorVersion = (short) minorVersion;
this.atomicUpdateGroupSize = (short) atomicUpdateGroupSize;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OvershadowableInteger that = (OvershadowableInteger) o;
return partitionNum == that.partitionNum &&
val == that.val &&
startRootPartitionId == that.startRootPartitionId &&
endRootPartitionId == that.endRootPartitionId &&
minorVersion == that.minorVersion &&
atomicUpdateGroupSize == that.atomicUpdateGroupSize &&
Objects.equals(majorVersion, that.majorVersion);
}
@Override
public int hashCode()
{
return Objects.hash(
majorVersion,
partitionNum,
val,
startRootPartitionId,
endRootPartitionId,
minorVersion,
atomicUpdateGroupSize
);
}
@Override
public String toString()
{
return "OvershadowableInteger{" +
"majorVersion='" + majorVersion + '\'' +
", partitionNum=" + partitionNum +
", val=" + val +
", startRootPartitionId=" + startRootPartitionId +
", endRootPartitionId=" + endRootPartitionId +
", minorVersion=" + minorVersion +
", atomicUpdateGroupSize=" + atomicUpdateGroupSize +
'}';
}
@Override
public int getStartRootPartitionId()
{
return startRootPartitionId;
}
@Override
public int getEndRootPartitionId()
{
return endRootPartitionId;
}
@Override
public String getVersion()
{
return majorVersion;
}
@Override
public short getMinorVersion()
{
return minorVersion;
}
@Override
public short getAtomicUpdateGroupSize()
{
return atomicUpdateGroupSize;
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.timeline.partition;
import org.apache.druid.timeline.Overshadowable;
import org.junit.Assert;
import org.junit.Test;
@ -110,44 +109,4 @@ public class IntegerPartitionChunkTest
Assert.assertEquals(make(10, null, 0, 1), make(10, null, 0, 1));
Assert.assertEquals(make(10, 11, 0, 1), make(10, 11, 0, 1));
}
private static class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
{
private final int val;
OvershadowableInteger(int val)
{
this.val = val;
}
@Override
public int getStartRootPartitionId()
{
return 0;
}
@Override
public int getEndRootPartitionId()
{
return 1;
}
@Override
public String getVersion()
{
return "";
}
@Override
public short getMinorVersion()
{
return 0;
}
@Override
public short getAtomicUpdateGroupSize()
{
return 1;
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline.partition;
import org.apache.druid.timeline.Overshadowable;
import java.util.Objects;
public class OvershadowableInteger implements Overshadowable<OvershadowableInteger>
{
private final String majorVersion;
private final int partitionNum;
private final int val;
private final int startRootPartitionId;
private final int endRootPartitionId;
private final short minorVersion;
private final short atomicUpdateGroupSize;
public OvershadowableInteger(int val)
{
this("", 0, val);
}
public OvershadowableInteger(String majorVersion, int partitionNum, int val)
{
this(majorVersion, partitionNum, val, partitionNum, partitionNum + 1, 0, 1);
}
public OvershadowableInteger(
String majorVersion,
int partitionNum,
int val,
int startRootPartitionId,
int endRootPartitionId,
int minorVersion,
int atomicUpdateGroupSize
)
{
this.majorVersion = majorVersion;
this.partitionNum = partitionNum;
this.val = val;
this.startRootPartitionId = startRootPartitionId;
this.endRootPartitionId = endRootPartitionId;
this.minorVersion = (short) minorVersion;
this.atomicUpdateGroupSize = (short) atomicUpdateGroupSize;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OvershadowableInteger that = (OvershadowableInteger) o;
return partitionNum == that.partitionNum &&
val == that.val &&
startRootPartitionId == that.startRootPartitionId &&
endRootPartitionId == that.endRootPartitionId &&
minorVersion == that.minorVersion &&
atomicUpdateGroupSize == that.atomicUpdateGroupSize &&
Objects.equals(majorVersion, that.majorVersion);
}
@Override
public int hashCode()
{
return Objects.hash(
majorVersion,
partitionNum,
val,
startRootPartitionId,
endRootPartitionId,
minorVersion,
atomicUpdateGroupSize
);
}
@Override
public String toString()
{
return "OvershadowableInteger{" +
"majorVersion='" + majorVersion + '\'' +
", partitionNum=" + partitionNum +
", val=" + val +
", startRootPartitionId=" + startRootPartitionId +
", endRootPartitionId=" + endRootPartitionId +
", minorVersion=" + minorVersion +
", atomicUpdateGroupSize=" + atomicUpdateGroupSize +
'}';
}
@Override
public int getStartRootPartitionId()
{
return startRootPartitionId;
}
@Override
public int getEndRootPartitionId()
{
return endRootPartitionId;
}
@Override
public String getVersion()
{
return majorVersion;
}
@Override
public short getMinorVersion()
{
return minorVersion;
}
@Override
public short getAtomicUpdateGroupSize()
{
return atomicUpdateGroupSize;
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.Overshadowable;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import java.io.Closeable;
@ -70,9 +71,9 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
}
};
public ReferenceCountingSegment(Segment baseSegment)
public static ReferenceCountingSegment wrapRootGenerationSegment(Segment baseSegment)
{
this(
return new ReferenceCountingSegment(
Preconditions.checkNotNull(baseSegment, "baseSegment"),
baseSegment.getId().getPartitionNum(),
(baseSegment.getId().getPartitionNum() + 1),
@ -81,7 +82,21 @@ public class ReferenceCountingSegment extends AbstractSegment implements Oversha
);
}
public ReferenceCountingSegment(
public static ReferenceCountingSegment wrapSegment(
Segment baseSegment,
ShardSpec shardSpec
)
{
return new ReferenceCountingSegment(
baseSegment,
shardSpec.getStartRootPartitionId(),
shardSpec.getEndRootPartitionId(),
shardSpec.getMinorVersion(),
shardSpec.getAtomicUpdateGroupSize()
);
}
private ReferenceCountingSegment(
Segment baseSegment,
int startRootPartitionId,
int endRootPartitionId,

View File

@ -148,9 +148,21 @@ public class MultiSegmentSelectQueryTest
segment_override = new IncrementalIndexSegment(index2, makeIdentifier(index2, "v2"));
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment0)));
timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment1)));
timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment_override)));
timeline.add(
index0.getInterval(),
"v1",
new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment0))
);
timeline.add(
index1.getInterval(),
"v1",
new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment1))
);
timeline.add(
index2.getInterval(),
"v2",
new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment_override))
);
segmentIdentifiers = new ArrayList<>();
for (TimelineObjectHolder<String, ?> holder : timeline.lookup(Intervals.of("2011-01-12/2011-01-14"))) {

View File

@ -145,8 +145,16 @@ public class TimeBoundaryQueryRunnerTest
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = new VersionedIntervalTimeline<>(StringComparators.LEXICOGRAPHIC);
timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment0)));
timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk<>(new ReferenceCountingSegment(segment1)));
timeline.add(
index0.getInterval(),
"v1",
new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment0))
);
timeline.add(
index1.getInterval(),
"v1",
new SingleElementPartitionChunk<>(ReferenceCountingSegment.wrapRootGenerationSegment(segment1))
);
return QueryRunnerTestHelper.makeFilteringQueryRunner(timeline, factory);
}

View File

@ -41,7 +41,7 @@ public class ReferenceCountingSegmentTest
@Before
public void setUp()
{
segment = new ReferenceCountingSegment(
segment = ReferenceCountingSegment.wrapRootGenerationSegment(
new AbstractSegment()
{
@Override

View File

@ -43,14 +43,16 @@ public class FireHydrant
public FireHydrant(IncrementalIndex index, int count, SegmentId segmentId)
{
this.index = index;
this.adapter = new AtomicReference<>(new ReferenceCountingSegment(new IncrementalIndexSegment(index, segmentId)));
this.adapter = new AtomicReference<>(
ReferenceCountingSegment.wrapRootGenerationSegment(new IncrementalIndexSegment(index, segmentId))
);
this.count = count;
}
public FireHydrant(Segment adapter, int count)
{
this.index = null;
this.adapter = new AtomicReference<>(new ReferenceCountingSegment(adapter));
this.adapter = new AtomicReference<>(ReferenceCountingSegment.wrapRootGenerationSegment(adapter));
this.count = count;
}
@ -120,7 +122,7 @@ public class FireHydrant
throw new ISE("Cannot swap to the same segment");
}
ReferenceCountingSegment newReferenceCountingSegment =
newSegment != null ? new ReferenceCountingSegment(newSegment) : null;
newSegment != null ? ReferenceCountingSegment.wrapRootGenerationSegment(newSegment) : null;
if (adapter.compareAndSet(currentSegment, newReferenceCountingSegment)) {
if (currentSegment != null) {
currentSegment.close();

View File

@ -172,11 +172,12 @@ public class SegmentManager
log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId());
resultSupplier.set(false);
} else {
final ReferenceCountingSegment referenceCountingSegment = new ReferenceCountingSegment(adapter);
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(referenceCountingSegment)
segment.getShardSpec().createChunk(
ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec())
)
);
dataSourceState.addSegment(segment);
resultSupplier.set(true);
@ -226,15 +227,7 @@ public class SegmentManager
segment.getVersion(),
// remove() internally searches for a partitionChunk to remove which is *equal* to the given
// partitionChunk. Note that partitionChunk.equals() checks only the partitionNum, but not the object.
segment.getShardSpec().createChunk(
new ReferenceCountingSegment(
null,
shardSpec.getStartRootPartitionId(),
shardSpec.getEndRootPartitionId(),
shardSpec.getMinorVersion(),
shardSpec.getAtomicUpdateGroupSize()
)
)
segment.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(null, shardSpec))
);
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();

View File

@ -36,6 +36,8 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@ -383,6 +385,34 @@ public class SegmentManagerTest
Assert.assertNull(segmentManager.getTimeline("nonExisting"));
}
@Test
public void testLoadAndDropNonRootGenerationSegment() throws SegmentLoadingException
{
final DataSegment segment = new DataSegment(
"small_source",
Intervals.of("0/1000"),
"0",
ImmutableMap.of("interval", Intervals.of("0/1000"), "version", 0),
new ArrayList<>(),
new ArrayList<>(),
new NumberedOverwriteShardSpec(
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + 10,
10,
20,
(short) 1,
(short) 1
),
0,
10
);
segmentManager.loadSegment(segment);
assertResult(ImmutableList.of(segment));
segmentManager.dropSegment(segment);
assertResult(ImmutableList.of());
}
@SuppressWarnings("RedundantThrows") // TODO remove when the bug in intelliJ is fixed.
private void assertResult(List<DataSegment> expectedExistingSegments) throws SegmentLoadingException
{
@ -403,7 +433,9 @@ public class SegmentManagerTest
expectedTimeline.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(new ReferenceCountingSegment(segmentLoader.getSegment(segment)))
segment.getShardSpec().createChunk(
ReferenceCountingSegment.wrapSegment(segmentLoader.getSegment(segment), segment.getShardSpec())
)
);
}

View File

@ -84,7 +84,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
timeline.add(
descriptor.getInterval(),
descriptor.getVersion(),
descriptor.getShardSpec().createChunk(new ReferenceCountingSegment(segment))
descriptor.getShardSpec().createChunk(ReferenceCountingSegment.wrapSegment(segment, descriptor.getShardSpec()))
);
segments.add(descriptor);
closeables.add(index);