diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index 89aac61f560..86d5acd2ef7 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -285,8 +285,8 @@ public class CacheFile { for (SparseFileTracker.Gap gap : gaps) { try { ensureOpen(); - onRangeMissing.accept(gap.start, gap.end); - gap.onProgress(gap.end); // TODO update progress in onRangeMissing + onRangeMissing.accept(gap.start(), gap.end()); + gap.onProgress(gap.end()); // TODO update progress in onRangeMissing gap.onCompletion(); } catch (Exception e) { gap.onFailure(e); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java index 932db09a3da..feeb55152e3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java @@ -126,7 +126,7 @@ public class SparseFileTracker { ); ranges.add(newPendingRange); pendingRanges.add(newPendingRange); - gaps.add(new Gap(targetRange.start, end)); + gaps.add(new Gap(newPendingRange)); targetRange.start = end; } else { final Range firstExistingRange = existingRanges.first(); @@ -146,7 +146,7 @@ public class SparseFileTracker { ); ranges.add(newPendingRange); pendingRanges.add(newPendingRange); - gaps.add(new Gap(targetRange.start, newPendingRange.end)); + gaps.add(new Gap(newPendingRange)); targetRange.start = newPendingRange.end; } } @@ -255,90 +255,82 @@ public class SparseFileTracker { } } - private void onGapSuccess(final long start, final long end) { + private boolean assertPendingRangeExists(Range range) { + assert Thread.holdsLock(mutex); + final SortedSet existingRanges = ranges.tailSet(range); + assert existingRanges.isEmpty() == false; + final Range existingRange = existingRanges.first(); + assert existingRange == range; + assert existingRange.isPending(); + return true; + } + + private void onGapSuccess(final Range gapRange) { final ProgressListenableActionFuture completionListener; synchronized (mutex) { assert invariant(); + assert assertPendingRangeExists(gapRange); + completionListener = gapRange.completionListener; + ranges.remove(gapRange); - final Range range = new Range(start, end, null); - final SortedSet existingRanges = ranges.tailSet(range); - assert existingRanges.isEmpty() == false; - - final Range existingRange = existingRanges.first(); - assert existingRange.start == start && existingRange.end == end && existingRange.isPending(); - completionListener = existingRange.completionListener; - ranges.remove(existingRange); - - final SortedSet prevRanges = ranges.headSet(existingRange); + final SortedSet prevRanges = ranges.headSet(gapRange); final Range prevRange = prevRanges.isEmpty() ? null : prevRanges.last(); - assert prevRange == null || prevRange.end <= existingRange.start : prevRange + " vs " + existingRange; - final boolean mergeWithPrev = prevRange != null && prevRange.isPending() == false && prevRange.end == existingRange.start; + assert prevRange == null || prevRange.end <= gapRange.start : prevRange + " vs " + gapRange; + final boolean mergeWithPrev = prevRange != null && prevRange.isPending() == false && prevRange.end == gapRange.start; - final SortedSet nextRanges = ranges.tailSet(existingRange); + final SortedSet nextRanges = ranges.tailSet(gapRange); final Range nextRange = nextRanges.isEmpty() ? null : nextRanges.first(); - assert nextRange == null || existingRange.end <= nextRange.start : existingRange + " vs " + nextRange; - final boolean mergeWithNext = nextRange != null && nextRange.isPending() == false && existingRange.end == nextRange.start; + assert nextRange == null || gapRange.end <= nextRange.start : gapRange + " vs " + nextRange; + final boolean mergeWithNext = nextRange != null && nextRange.isPending() == false && gapRange.end == nextRange.start; if (mergeWithPrev && mergeWithNext) { assert prevRange.isPending() == false : prevRange; assert nextRange.isPending() == false : nextRange; - assert prevRange.end == existingRange.start : prevRange + " vs " + existingRange; - assert existingRange.end == nextRange.start : existingRange + " vs " + nextRange; + assert prevRange.end == gapRange.start : prevRange + " vs " + gapRange; + assert gapRange.end == nextRange.start : gapRange + " vs " + nextRange; prevRange.end = nextRange.end; ranges.remove(nextRange); } else if (mergeWithPrev) { assert prevRange.isPending() == false : prevRange; - assert prevRange.end == existingRange.start : prevRange + " vs " + existingRange; - prevRange.end = existingRange.end; + assert prevRange.end == gapRange.start : prevRange + " vs " + gapRange; + prevRange.end = gapRange.end; } else if (mergeWithNext) { assert nextRange.isPending() == false : nextRange; - assert existingRange.end == nextRange.start : existingRange + " vs " + nextRange; - nextRange.start = existingRange.start; + assert gapRange.end == nextRange.start : gapRange + " vs " + nextRange; + nextRange.start = gapRange.start; } else { - ranges.add(new Range(start, end, null)); + ranges.add(new Range(gapRange.start, gapRange.end, null)); } assert invariant(); } - completionListener.onResponse(end); + completionListener.onResponse(gapRange.end); } - private void onGapProgress(long start, long end, long value) { + private void onGapProgress(final Range gapRange, long value) { final ProgressListenableActionFuture completionListener; synchronized (mutex) { assert invariant(); - - final Range range = new Range(start, end, null); - final SortedSet existingRanges = ranges.tailSet(range); - assert existingRanges.isEmpty() == false; - - final Range existingRange = existingRanges.first(); - assert existingRange.start == start && existingRange.end == end && existingRange.isPending(); - completionListener = existingRange.completionListener; + assert assertPendingRangeExists(gapRange); + completionListener = gapRange.completionListener; assert invariant(); } completionListener.onProgress(value); } - private void onGapFailure(long start, long end, Exception e) { + private void onGapFailure(final Range gapRange, Exception e) { final ProgressListenableActionFuture completionListener; synchronized (mutex) { assert invariant(); - - final Range range = new Range(start, end, null); - final SortedSet existingRanges = ranges.tailSet(range); - assert existingRanges.isEmpty() == false; - - final Range existingRange = existingRanges.first(); - assert existingRange.start == start && existingRange.end == end && existingRange.isPending(); - completionListener = existingRange.completionListener; - ranges.remove(existingRange); - + assert assertPendingRangeExists(gapRange); + completionListener = gapRange.completionListener; + final boolean removed = ranges.remove(gapRange); + assert removed : gapRange + " not found"; assert invariant(); } @@ -386,37 +378,40 @@ public class SparseFileTracker { * Represents a gap in the file that a client should fill in. */ public class Gap { - /** - * Inclusive start point of this range - */ - public final long start; /** - * Exclusive end point of this range + * Range in the file corresponding to the current gap */ - public final long end; + public final Range range; - Gap(long start, long end) { - assert start < end : start + "-" + end; - this.start = start; - this.end = end; + Gap(Range range) { + assert range.start < range.end : range.start + "-" + range.end; + this.range = range; + } + + public long start() { + return range.start; + } + + public long end() { + return range.end; } public void onCompletion() { - onGapSuccess(start, end); + onGapSuccess(range); } public void onProgress(long value) { - onGapProgress(start, end, value); + onGapProgress(range, value); } public void onFailure(Exception e) { - onGapFailure(start, end, e); + onGapFailure(range, e); } @Override public String toString() { - return SparseFileTracker.this.toString() + " [" + start + "-" + end + "]"; + return SparseFileTracker.this.toString() + ' ' + range; } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index 91ab6ecbf8a..808613fb52a 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -153,15 +153,15 @@ public class SparseFileTrackerTests extends ESTestCase { }, e -> { throw new AssertionError(e); })); for (int gapIndex = 0; gapIndex < gaps.size(); gapIndex++) { final SparseFileTracker.Gap gap = gaps.get(gapIndex); - assertThat(gap.start, greaterThanOrEqualTo(start)); - assertThat(gap.end, lessThanOrEqualTo(end)); + assertThat(gap.start(), greaterThanOrEqualTo(start)); + assertThat(gap.end(), lessThanOrEqualTo(end)); // listener is notified when the last gap is completed final AtomicBoolean shouldNotifyListener = new AtomicBoolean(); - for (long i = gap.start; i < gap.end; i++) { + for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); fileContents[Math.toIntExact(i)] = AVAILABLE; // listener is notified when the progress reached the last byte of the last gap - if ((gapIndex == gaps.size() - 1) && (i == gap.end - 1L)) { + if ((gapIndex == gaps.size() - 1) && (i == gap.end() - 1L)) { assertTrue(shouldNotifyListener.compareAndSet(false, true)); expectNotification.set(true); } @@ -232,10 +232,10 @@ public class SparseFileTrackerTests extends ESTestCase { ); for (final SparseFileTracker.Gap gap : gaps) { - assertThat(gap.start, greaterThanOrEqualTo(range.v1())); - assertThat(gap.end, lessThanOrEqualTo(range.v2())); + assertThat(gap.start(), greaterThanOrEqualTo(range.v1())); + assertThat(gap.end(), lessThanOrEqualTo(range.v2())); - for (long i = gap.start; i < gap.end; i++) { + for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); fileContents[Math.toIntExact(i)] = AVAILABLE; assertTrue(wasNotified.get()); @@ -263,10 +263,10 @@ public class SparseFileTrackerTests extends ESTestCase { assertThat(triggeringProgress, greaterThanOrEqualTo(0L)); for (final SparseFileTracker.Gap gap : gaps) { - assertThat(gap.start, greaterThanOrEqualTo(range.v1())); - assertThat(gap.end, lessThanOrEqualTo(range.v2())); + assertThat(gap.start(), greaterThanOrEqualTo(range.v1())); + assertThat(gap.end(), lessThanOrEqualTo(range.v2())); - for (long i = gap.start; i < gap.end; i++) { + for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); fileContents[Math.toIntExact(i)] = AVAILABLE; if (triggeringProgress == i) { @@ -303,7 +303,7 @@ public class SparseFileTrackerTests extends ESTestCase { + gap + "] was completed", wasNotified.get(), - equalTo(triggeringProgress < gap.end) + equalTo(triggeringProgress < gap.end()) ); } assertTrue(wasNotified.get()); @@ -454,7 +454,7 @@ public class SparseFileTrackerTests extends ESTestCase { ); for (final SparseFileTracker.Gap gap : gaps) { - for (long i = gap.start; i < gap.end; i++) { + for (long i = gap.start(); i < gap.end(); i++) { assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); } gapConsumer.accept(gap); @@ -462,14 +462,14 @@ public class SparseFileTrackerTests extends ESTestCase { } private static void processGap(byte[] fileContents, SparseFileTracker.Gap gap) { - for (long i = gap.start; i < gap.end; i++) { + for (long i = gap.start(); i < gap.end(); i++) { assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE)); } if (randomBoolean()) { gap.onFailure(new ElasticsearchException("simulated")); } else { - for (long i = gap.start; i < gap.end; i++) { + for (long i = gap.start(); i < gap.end(); i++) { fileContents[Math.toIntExact(i)] = AVAILABLE; gap.onProgress(i + 1L); }