From c89fb8b9faa2e37fbe0ad7fdf806ce2b43070712 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 27 Aug 2020 15:03:12 +0100 Subject: [PATCH] Avoid listener call under SparseFileTracker#mutex (#61626) Today we sometimes notify a listener of completion while holding `SparseFileTracker#mutex`. This commit move all such calls out from under the mutex and adds assertions that the mutex is not held in the listener. Closes #61520 --- .../index/store/cache/SparseFileTracker.java | 141 ++++++++++-------- 1 file changed, 79 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java index 9e9217688b6..8b6cfdd9454 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.index.store.cache; +import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.common.Nullable; @@ -95,6 +96,9 @@ public class SparseFileTracker { ); } + final ActionListener wrappedListener = wrapWithAssertions(listener); + final List requiredRanges; + final List gaps = new ArrayList<>(); synchronized (mutex) { assert invariant(); @@ -155,49 +159,46 @@ public class SparseFileTracker { assert targetRange.start == end : targetRange; assert invariant(); - if (pendingRanges.isEmpty() == false) { - assert ranges.containsAll(pendingRanges) : ranges + " vs " + pendingRanges; - assert pendingRanges.stream().allMatch(Range::isPending) : pendingRanges; - assert pendingRanges.size() != 1 || gaps.size() <= 1 : gaps; + assert ranges.containsAll(pendingRanges) : ranges + " vs " + pendingRanges; + assert pendingRanges.stream().allMatch(Range::isPending) : pendingRanges; + assert pendingRanges.size() != 1 || gaps.size() <= 1 : gaps; - // Pending ranges that needs to be filled before executing the listener - final List requiredRanges = (start == subRange.v1() && end == subRange.v2()) - ? pendingRanges - : pendingRanges.stream() - .filter(pendingRange -> pendingRange.start < subRange.v2()) - .filter(pendingRange -> subRange.v1() < pendingRange.end) - .sorted(Comparator.comparingLong(r -> r.start)) - .collect(Collectors.toList()); - - switch (requiredRanges.size()) { - case 0: - // no need to wait for the gaps to be filled, the listener can be executed immediately - listener.onResponse(null); - break; - case 1: - final Range requiredRange = requiredRanges.get(0); - requiredRange.completionListener.addListener( - ActionListener.map(listener, progress -> null), - Math.min(requiredRange.end, subRange.v2()) - ); - break; - default: - final GroupedActionListener groupedActionListener = new GroupedActionListener<>( - ActionListener.map(listener, progress -> null), - requiredRanges.size() - ); - requiredRanges.forEach( - r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, subRange.v2())) - ); - } - - return Collections.unmodifiableList(gaps); - } + // Pending ranges that needs to be filled before executing the listener + requiredRanges = (start == subRange.v1() && end == subRange.v2()) + ? pendingRanges + : pendingRanges.stream() + .filter(pendingRange -> pendingRange.start < subRange.v2()) + .filter(pendingRange -> subRange.v1() < pendingRange.end) + .sorted(Comparator.comparingLong(r -> r.start)) + .collect(Collectors.toList()); } - assert gaps.isEmpty(); // or else pendingRanges.isEmpty() == false so we already returned - listener.onResponse(null); - return Collections.emptyList(); + // NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so + // there is no risk of concurrent modification. + + switch (requiredRanges.size()) { + case 0: + // no need to wait for the gaps to be filled, the listener can be executed immediately + wrappedListener.onResponse(null); + break; + case 1: + final Range requiredRange = requiredRanges.get(0); + requiredRange.completionListener.addListener( + ActionListener.map(wrappedListener, progress -> null), + Math.min(requiredRange.completionListener.end, subRange.v2()) + ); + break; + default: + final GroupedActionListener groupedActionListener = new GroupedActionListener<>( + ActionListener.map(wrappedListener, progress -> null), + requiredRanges.size() + ); + requiredRanges.forEach( + r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, subRange.v2())) + ); + } + + return Collections.unmodifiableList(gaps); } /** @@ -218,11 +219,12 @@ public class SparseFileTracker { throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]"); } + final ActionListener wrappedListener = wrapWithAssertions(listener); + final List pendingRanges = new ArrayList<>(); + synchronized (mutex) { assert invariant(); - final List pendingRanges = new ArrayList<>(); - final Range targetRange = new Range(start, end, null); final SortedSet earlierRanges = ranges.headSet(targetRange, false); // ranges with strictly earlier starts if (earlierRanges.isEmpty() == false) { @@ -259,31 +261,46 @@ public class SparseFileTracker { assert targetRange.start == targetRange.end : targetRange; assert targetRange.start == end : targetRange; assert invariant(); - - switch (pendingRanges.size()) { - case 0: - break; - case 1: - final Range pendingRange = pendingRanges.get(0); - pendingRange.completionListener.addListener( - ActionListener.map(listener, progress -> null), - Math.min(pendingRange.end, end) - ); - return true; - default: - final GroupedActionListener groupedActionListener = new GroupedActionListener<>( - ActionListener.map(listener, progress -> null), - pendingRanges.size() - ); - pendingRanges.forEach(r -> r.completionListener.addListener(groupedActionListener, Math.min(r.end, end))); - return true; - } } - listener.onResponse(null); + // NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so + // there is no risk of concurrent modification. + + switch (pendingRanges.size()) { + case 0: + wrappedListener.onResponse(null); + break; + case 1: + final Range pendingRange = pendingRanges.get(0); + pendingRange.completionListener.addListener( + ActionListener.map(wrappedListener, progress -> null), + Math.min(pendingRange.completionListener.end, end) + ); + return true; + default: + final GroupedActionListener groupedActionListener = new GroupedActionListener<>( + ActionListener.map(wrappedListener, progress -> null), + pendingRanges.size() + ); + pendingRanges.forEach( + r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, end)) + ); + return true; + } return true; } + private ActionListener wrapWithAssertions(ActionListener listener) { + if (Assertions.ENABLED) { + return ActionListener.runAfter( + listener, + () -> { assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held in listener"; } + ); + } else { + return listener; + } + } + /** * Returns a range that contains all bytes of the target range which are absent (possibly pending). The returned range may include * some ranges of present bytes. It tries to return the smallest possible range, but does so on a best-effort basis. This method does