Avoid listener call under SparseFileTracker#mutex (#61626)

Today we sometimes notify a listener of completion while holding
`SparseFileTracker#mutex`. This commit move all such calls out from
under the mutex and adds assertions that the mutex is not held in the
listener.

Closes #61520
This commit is contained in:
David Turner 2020-08-27 15:03:12 +01:00
parent b6cb590685
commit c89fb8b9fa
1 changed files with 79 additions and 62 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.index.store.cache;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.common.Nullable;
@ -95,6 +96,9 @@ public class SparseFileTracker {
);
}
final ActionListener<Void> wrappedListener = wrapWithAssertions(listener);
final List<Range> requiredRanges;
final List<Gap> gaps = new ArrayList<>();
synchronized (mutex) {
assert invariant();
@ -155,49 +159,46 @@ public class SparseFileTracker {
assert targetRange.start == end : targetRange;
assert invariant();
if (pendingRanges.isEmpty() == false) {
assert ranges.containsAll(pendingRanges) : ranges + " vs " + pendingRanges;
assert pendingRanges.stream().allMatch(Range::isPending) : pendingRanges;
assert pendingRanges.size() != 1 || gaps.size() <= 1 : gaps;
assert ranges.containsAll(pendingRanges) : ranges + " vs " + pendingRanges;
assert pendingRanges.stream().allMatch(Range::isPending) : pendingRanges;
assert pendingRanges.size() != 1 || gaps.size() <= 1 : gaps;
// Pending ranges that needs to be filled before executing the listener
final List<Range> requiredRanges = (start == subRange.v1() && end == subRange.v2())
? pendingRanges
: pendingRanges.stream()
.filter(pendingRange -> pendingRange.start < subRange.v2())
.filter(pendingRange -> subRange.v1() < pendingRange.end)
.sorted(Comparator.comparingLong(r -> r.start))
.collect(Collectors.toList());
switch (requiredRanges.size()) {
case 0:
// no need to wait for the gaps to be filled, the listener can be executed immediately
listener.onResponse(null);
break;
case 1:
final Range requiredRange = requiredRanges.get(0);
requiredRange.completionListener.addListener(
ActionListener.map(listener, progress -> null),
Math.min(requiredRange.end, subRange.v2())
);
break;
default:
final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
ActionListener.map(listener, progress -> null),
requiredRanges.size()
);
requiredRanges.forEach(
r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, subRange.v2()))
);
}
return Collections.unmodifiableList(gaps);
}
// Pending ranges that needs to be filled before executing the listener
requiredRanges = (start == subRange.v1() && end == subRange.v2())
? pendingRanges
: pendingRanges.stream()
.filter(pendingRange -> pendingRange.start < subRange.v2())
.filter(pendingRange -> subRange.v1() < pendingRange.end)
.sorted(Comparator.comparingLong(r -> r.start))
.collect(Collectors.toList());
}
assert gaps.isEmpty(); // or else pendingRanges.isEmpty() == false so we already returned
listener.onResponse(null);
return Collections.emptyList();
// NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so
// there is no risk of concurrent modification.
switch (requiredRanges.size()) {
case 0:
// no need to wait for the gaps to be filled, the listener can be executed immediately
wrappedListener.onResponse(null);
break;
case 1:
final Range requiredRange = requiredRanges.get(0);
requiredRange.completionListener.addListener(
ActionListener.map(wrappedListener, progress -> null),
Math.min(requiredRange.completionListener.end, subRange.v2())
);
break;
default:
final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
ActionListener.map(wrappedListener, progress -> null),
requiredRanges.size()
);
requiredRanges.forEach(
r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, subRange.v2()))
);
}
return Collections.unmodifiableList(gaps);
}
/**
@ -218,11 +219,12 @@ public class SparseFileTracker {
throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]");
}
final ActionListener<Void> wrappedListener = wrapWithAssertions(listener);
final List<Range> pendingRanges = new ArrayList<>();
synchronized (mutex) {
assert invariant();
final List<Range> pendingRanges = new ArrayList<>();
final Range targetRange = new Range(start, end, null);
final SortedSet<Range> earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts
if (earlierRanges.isEmpty() == false) {
@ -259,31 +261,46 @@ public class SparseFileTracker {
assert targetRange.start == targetRange.end : targetRange;
assert targetRange.start == end : targetRange;
assert invariant();
switch (pendingRanges.size()) {
case 0:
break;
case 1:
final Range pendingRange = pendingRanges.get(0);
pendingRange.completionListener.addListener(
ActionListener.map(listener, progress -> null),
Math.min(pendingRange.end, end)
);
return true;
default:
final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
ActionListener.map(listener, progress -> null),
pendingRanges.size()
);
pendingRanges.forEach(r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, end)));
return true;
}
}
listener.onResponse(null);
// NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so
// there is no risk of concurrent modification.
switch (pendingRanges.size()) {
case 0:
wrappedListener.onResponse(null);
break;
case 1:
final Range pendingRange = pendingRanges.get(0);
pendingRange.completionListener.addListener(
ActionListener.map(wrappedListener, progress -> null),
Math.min(pendingRange.completionListener.end, end)
);
return true;
default:
final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
ActionListener.map(wrappedListener, progress -> null),
pendingRanges.size()
);
pendingRanges.forEach(
r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, end))
);
return true;
}
return true;
}
private ActionListener<Void> wrapWithAssertions(ActionListener<Void> listener) {
if (Assertions.ENABLED) {
return ActionListener.runAfter(
listener,
() -> { assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held in listener"; }
);
} else {
return listener;
}
}
/**
* Returns a range that contains all bytes of the target range which are absent (possibly pending). The returned range may include
* some ranges of present bytes. It tries to return the smallest possible range, but does so on a best-effort basis. This method does