SparseFileTracker.Gap can keep a reference to the corresponding range it is about to fill, it does not need to resolve the range each time onSuccess/onProgress/onFailure are called. Relates #58477
This commit is contained in:
parent
ffcbf9ca0c
commit
73adcf4d44
|
@ -285,8 +285,8 @@ public class CacheFile {
|
||||||
for (SparseFileTracker.Gap gap : gaps) {
|
for (SparseFileTracker.Gap gap : gaps) {
|
||||||
try {
|
try {
|
||||||
ensureOpen();
|
ensureOpen();
|
||||||
onRangeMissing.accept(gap.start, gap.end);
|
onRangeMissing.accept(gap.start(), gap.end());
|
||||||
gap.onProgress(gap.end); // TODO update progress in onRangeMissing
|
gap.onProgress(gap.end()); // TODO update progress in onRangeMissing
|
||||||
gap.onCompletion();
|
gap.onCompletion();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
gap.onFailure(e);
|
gap.onFailure(e);
|
||||||
|
|
|
@ -126,7 +126,7 @@ public class SparseFileTracker {
|
||||||
);
|
);
|
||||||
ranges.add(newPendingRange);
|
ranges.add(newPendingRange);
|
||||||
pendingRanges.add(newPendingRange);
|
pendingRanges.add(newPendingRange);
|
||||||
gaps.add(new Gap(targetRange.start, end));
|
gaps.add(new Gap(newPendingRange));
|
||||||
targetRange.start = end;
|
targetRange.start = end;
|
||||||
} else {
|
} else {
|
||||||
final Range firstExistingRange = existingRanges.first();
|
final Range firstExistingRange = existingRanges.first();
|
||||||
|
@ -146,7 +146,7 @@ public class SparseFileTracker {
|
||||||
);
|
);
|
||||||
ranges.add(newPendingRange);
|
ranges.add(newPendingRange);
|
||||||
pendingRanges.add(newPendingRange);
|
pendingRanges.add(newPendingRange);
|
||||||
gaps.add(new Gap(targetRange.start, newPendingRange.end));
|
gaps.add(new Gap(newPendingRange));
|
||||||
targetRange.start = newPendingRange.end;
|
targetRange.start = newPendingRange.end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -255,90 +255,82 @@ public class SparseFileTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onGapSuccess(final long start, final long end) {
|
private boolean assertPendingRangeExists(Range range) {
|
||||||
|
assert Thread.holdsLock(mutex);
|
||||||
|
final SortedSet<Range> existingRanges = ranges.tailSet(range);
|
||||||
|
assert existingRanges.isEmpty() == false;
|
||||||
|
final Range existingRange = existingRanges.first();
|
||||||
|
assert existingRange == range;
|
||||||
|
assert existingRange.isPending();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onGapSuccess(final Range gapRange) {
|
||||||
final ProgressListenableActionFuture completionListener;
|
final ProgressListenableActionFuture completionListener;
|
||||||
|
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
assert invariant();
|
assert invariant();
|
||||||
|
assert assertPendingRangeExists(gapRange);
|
||||||
|
completionListener = gapRange.completionListener;
|
||||||
|
ranges.remove(gapRange);
|
||||||
|
|
||||||
final Range range = new Range(start, end, null);
|
final SortedSet<Range> prevRanges = ranges.headSet(gapRange);
|
||||||
final SortedSet<Range> existingRanges = ranges.tailSet(range);
|
|
||||||
assert existingRanges.isEmpty() == false;
|
|
||||||
|
|
||||||
final Range existingRange = existingRanges.first();
|
|
||||||
assert existingRange.start == start && existingRange.end == end && existingRange.isPending();
|
|
||||||
completionListener = existingRange.completionListener;
|
|
||||||
ranges.remove(existingRange);
|
|
||||||
|
|
||||||
final SortedSet<Range> prevRanges = ranges.headSet(existingRange);
|
|
||||||
final Range prevRange = prevRanges.isEmpty() ? null : prevRanges.last();
|
final Range prevRange = prevRanges.isEmpty() ? null : prevRanges.last();
|
||||||
assert prevRange == null || prevRange.end <= existingRange.start : prevRange + " vs " + existingRange;
|
assert prevRange == null || prevRange.end <= gapRange.start : prevRange + " vs " + gapRange;
|
||||||
final boolean mergeWithPrev = prevRange != null && prevRange.isPending() == false && prevRange.end == existingRange.start;
|
final boolean mergeWithPrev = prevRange != null && prevRange.isPending() == false && prevRange.end == gapRange.start;
|
||||||
|
|
||||||
final SortedSet<Range> nextRanges = ranges.tailSet(existingRange);
|
final SortedSet<Range> nextRanges = ranges.tailSet(gapRange);
|
||||||
final Range nextRange = nextRanges.isEmpty() ? null : nextRanges.first();
|
final Range nextRange = nextRanges.isEmpty() ? null : nextRanges.first();
|
||||||
assert nextRange == null || existingRange.end <= nextRange.start : existingRange + " vs " + nextRange;
|
assert nextRange == null || gapRange.end <= nextRange.start : gapRange + " vs " + nextRange;
|
||||||
final boolean mergeWithNext = nextRange != null && nextRange.isPending() == false && existingRange.end == nextRange.start;
|
final boolean mergeWithNext = nextRange != null && nextRange.isPending() == false && gapRange.end == nextRange.start;
|
||||||
|
|
||||||
if (mergeWithPrev && mergeWithNext) {
|
if (mergeWithPrev && mergeWithNext) {
|
||||||
assert prevRange.isPending() == false : prevRange;
|
assert prevRange.isPending() == false : prevRange;
|
||||||
assert nextRange.isPending() == false : nextRange;
|
assert nextRange.isPending() == false : nextRange;
|
||||||
assert prevRange.end == existingRange.start : prevRange + " vs " + existingRange;
|
assert prevRange.end == gapRange.start : prevRange + " vs " + gapRange;
|
||||||
assert existingRange.end == nextRange.start : existingRange + " vs " + nextRange;
|
assert gapRange.end == nextRange.start : gapRange + " vs " + nextRange;
|
||||||
prevRange.end = nextRange.end;
|
prevRange.end = nextRange.end;
|
||||||
ranges.remove(nextRange);
|
ranges.remove(nextRange);
|
||||||
} else if (mergeWithPrev) {
|
} else if (mergeWithPrev) {
|
||||||
assert prevRange.isPending() == false : prevRange;
|
assert prevRange.isPending() == false : prevRange;
|
||||||
assert prevRange.end == existingRange.start : prevRange + " vs " + existingRange;
|
assert prevRange.end == gapRange.start : prevRange + " vs " + gapRange;
|
||||||
prevRange.end = existingRange.end;
|
prevRange.end = gapRange.end;
|
||||||
} else if (mergeWithNext) {
|
} else if (mergeWithNext) {
|
||||||
assert nextRange.isPending() == false : nextRange;
|
assert nextRange.isPending() == false : nextRange;
|
||||||
assert existingRange.end == nextRange.start : existingRange + " vs " + nextRange;
|
assert gapRange.end == nextRange.start : gapRange + " vs " + nextRange;
|
||||||
nextRange.start = existingRange.start;
|
nextRange.start = gapRange.start;
|
||||||
} else {
|
} else {
|
||||||
ranges.add(new Range(start, end, null));
|
ranges.add(new Range(gapRange.start, gapRange.end, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
|
||||||
completionListener.onResponse(end);
|
completionListener.onResponse(gapRange.end);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onGapProgress(long start, long end, long value) {
|
private void onGapProgress(final Range gapRange, long value) {
|
||||||
final ProgressListenableActionFuture completionListener;
|
final ProgressListenableActionFuture completionListener;
|
||||||
|
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
assert invariant();
|
assert invariant();
|
||||||
|
assert assertPendingRangeExists(gapRange);
|
||||||
final Range range = new Range(start, end, null);
|
completionListener = gapRange.completionListener;
|
||||||
final SortedSet<Range> existingRanges = ranges.tailSet(range);
|
|
||||||
assert existingRanges.isEmpty() == false;
|
|
||||||
|
|
||||||
final Range existingRange = existingRanges.first();
|
|
||||||
assert existingRange.start == start && existingRange.end == end && existingRange.isPending();
|
|
||||||
completionListener = existingRange.completionListener;
|
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
|
||||||
completionListener.onProgress(value);
|
completionListener.onProgress(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onGapFailure(long start, long end, Exception e) {
|
private void onGapFailure(final Range gapRange, Exception e) {
|
||||||
final ProgressListenableActionFuture completionListener;
|
final ProgressListenableActionFuture completionListener;
|
||||||
|
|
||||||
synchronized (mutex) {
|
synchronized (mutex) {
|
||||||
assert invariant();
|
assert invariant();
|
||||||
|
assert assertPendingRangeExists(gapRange);
|
||||||
final Range range = new Range(start, end, null);
|
completionListener = gapRange.completionListener;
|
||||||
final SortedSet<Range> existingRanges = ranges.tailSet(range);
|
final boolean removed = ranges.remove(gapRange);
|
||||||
assert existingRanges.isEmpty() == false;
|
assert removed : gapRange + " not found";
|
||||||
|
|
||||||
final Range existingRange = existingRanges.first();
|
|
||||||
assert existingRange.start == start && existingRange.end == end && existingRange.isPending();
|
|
||||||
completionListener = existingRange.completionListener;
|
|
||||||
ranges.remove(existingRange);
|
|
||||||
|
|
||||||
assert invariant();
|
assert invariant();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,37 +378,40 @@ public class SparseFileTracker {
|
||||||
* Represents a gap in the file that a client should fill in.
|
* Represents a gap in the file that a client should fill in.
|
||||||
*/
|
*/
|
||||||
public class Gap {
|
public class Gap {
|
||||||
/**
|
|
||||||
* Inclusive start point of this range
|
|
||||||
*/
|
|
||||||
public final long start;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Exclusive end point of this range
|
* Range in the file corresponding to the current gap
|
||||||
*/
|
*/
|
||||||
public final long end;
|
public final Range range;
|
||||||
|
|
||||||
Gap(long start, long end) {
|
Gap(Range range) {
|
||||||
assert start < end : start + "-" + end;
|
assert range.start < range.end : range.start + "-" + range.end;
|
||||||
this.start = start;
|
this.range = range;
|
||||||
this.end = end;
|
}
|
||||||
|
|
||||||
|
public long start() {
|
||||||
|
return range.start;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long end() {
|
||||||
|
return range.end;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onCompletion() {
|
public void onCompletion() {
|
||||||
onGapSuccess(start, end);
|
onGapSuccess(range);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onProgress(long value) {
|
public void onProgress(long value) {
|
||||||
onGapProgress(start, end, value);
|
onGapProgress(range, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
onGapFailure(start, end, e);
|
onGapFailure(range, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return SparseFileTracker.this.toString() + " [" + start + "-" + end + "]";
|
return SparseFileTracker.this.toString() + ' ' + range;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -153,15 +153,15 @@ public class SparseFileTrackerTests extends ESTestCase {
|
||||||
}, e -> { throw new AssertionError(e); }));
|
}, e -> { throw new AssertionError(e); }));
|
||||||
for (int gapIndex = 0; gapIndex < gaps.size(); gapIndex++) {
|
for (int gapIndex = 0; gapIndex < gaps.size(); gapIndex++) {
|
||||||
final SparseFileTracker.Gap gap = gaps.get(gapIndex);
|
final SparseFileTracker.Gap gap = gaps.get(gapIndex);
|
||||||
assertThat(gap.start, greaterThanOrEqualTo(start));
|
assertThat(gap.start(), greaterThanOrEqualTo(start));
|
||||||
assertThat(gap.end, lessThanOrEqualTo(end));
|
assertThat(gap.end(), lessThanOrEqualTo(end));
|
||||||
// listener is notified when the last gap is completed
|
// listener is notified when the last gap is completed
|
||||||
final AtomicBoolean shouldNotifyListener = new AtomicBoolean();
|
final AtomicBoolean shouldNotifyListener = new AtomicBoolean();
|
||||||
for (long i = gap.start; i < gap.end; i++) {
|
for (long i = gap.start(); i < gap.end(); i++) {
|
||||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||||
// listener is notified when the progress reached the last byte of the last gap
|
// listener is notified when the progress reached the last byte of the last gap
|
||||||
if ((gapIndex == gaps.size() - 1) && (i == gap.end - 1L)) {
|
if ((gapIndex == gaps.size() - 1) && (i == gap.end() - 1L)) {
|
||||||
assertTrue(shouldNotifyListener.compareAndSet(false, true));
|
assertTrue(shouldNotifyListener.compareAndSet(false, true));
|
||||||
expectNotification.set(true);
|
expectNotification.set(true);
|
||||||
}
|
}
|
||||||
|
@ -232,10 +232,10 @@ public class SparseFileTrackerTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
|
|
||||||
for (final SparseFileTracker.Gap gap : gaps) {
|
for (final SparseFileTracker.Gap gap : gaps) {
|
||||||
assertThat(gap.start, greaterThanOrEqualTo(range.v1()));
|
assertThat(gap.start(), greaterThanOrEqualTo(range.v1()));
|
||||||
assertThat(gap.end, lessThanOrEqualTo(range.v2()));
|
assertThat(gap.end(), lessThanOrEqualTo(range.v2()));
|
||||||
|
|
||||||
for (long i = gap.start; i < gap.end; i++) {
|
for (long i = gap.start(); i < gap.end(); i++) {
|
||||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||||
assertTrue(wasNotified.get());
|
assertTrue(wasNotified.get());
|
||||||
|
@ -263,10 +263,10 @@ public class SparseFileTrackerTests extends ESTestCase {
|
||||||
assertThat(triggeringProgress, greaterThanOrEqualTo(0L));
|
assertThat(triggeringProgress, greaterThanOrEqualTo(0L));
|
||||||
|
|
||||||
for (final SparseFileTracker.Gap gap : gaps) {
|
for (final SparseFileTracker.Gap gap : gaps) {
|
||||||
assertThat(gap.start, greaterThanOrEqualTo(range.v1()));
|
assertThat(gap.start(), greaterThanOrEqualTo(range.v1()));
|
||||||
assertThat(gap.end, lessThanOrEqualTo(range.v2()));
|
assertThat(gap.end(), lessThanOrEqualTo(range.v2()));
|
||||||
|
|
||||||
for (long i = gap.start; i < gap.end; i++) {
|
for (long i = gap.start(); i < gap.end(); i++) {
|
||||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||||
if (triggeringProgress == i) {
|
if (triggeringProgress == i) {
|
||||||
|
@ -303,7 +303,7 @@ public class SparseFileTrackerTests extends ESTestCase {
|
||||||
+ gap
|
+ gap
|
||||||
+ "] was completed",
|
+ "] was completed",
|
||||||
wasNotified.get(),
|
wasNotified.get(),
|
||||||
equalTo(triggeringProgress < gap.end)
|
equalTo(triggeringProgress < gap.end())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assertTrue(wasNotified.get());
|
assertTrue(wasNotified.get());
|
||||||
|
@ -454,7 +454,7 @@ public class SparseFileTrackerTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
|
|
||||||
for (final SparseFileTracker.Gap gap : gaps) {
|
for (final SparseFileTracker.Gap gap : gaps) {
|
||||||
for (long i = gap.start; i < gap.end; i++) {
|
for (long i = gap.start(); i < gap.end(); i++) {
|
||||||
assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
assertThat(Long.toString(i), fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||||
}
|
}
|
||||||
gapConsumer.accept(gap);
|
gapConsumer.accept(gap);
|
||||||
|
@ -462,14 +462,14 @@ public class SparseFileTrackerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void processGap(byte[] fileContents, SparseFileTracker.Gap gap) {
|
private static void processGap(byte[] fileContents, SparseFileTracker.Gap gap) {
|
||||||
for (long i = gap.start; i < gap.end; i++) {
|
for (long i = gap.start(); i < gap.end(); i++) {
|
||||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
gap.onFailure(new ElasticsearchException("simulated"));
|
gap.onFailure(new ElasticsearchException("simulated"));
|
||||||
} else {
|
} else {
|
||||||
for (long i = gap.start; i < gap.end; i++) {
|
for (long i = gap.start(); i < gap.end(); i++) {
|
||||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||||
gap.onProgress(i + 1L);
|
gap.onProgress(i + 1L);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue