Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on. Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
c06c89d3db
commit
775fb5d4cf
|
@ -49,8 +49,8 @@ public class PlainListenableActionFuture<T> extends AdapterActionFuture<T, T> im
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void done() {
|
||||
super.done();
|
||||
protected void done(boolean success) {
|
||||
super.done(success);
|
||||
synchronized (this) {
|
||||
executedListeners = true;
|
||||
}
|
||||
|
|
|
@ -110,7 +110,7 @@ public abstract class BaseFuture<V> implements Future<V> {
|
|||
if (!sync.cancel()) {
|
||||
return false;
|
||||
}
|
||||
done();
|
||||
done(false);
|
||||
if (mayInterruptIfRunning) {
|
||||
interruptTask();
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ public abstract class BaseFuture<V> implements Future<V> {
|
|||
/**
|
||||
* Subclasses should invoke this method to set the result of the computation
|
||||
* to {@code value}. This will set the state of the future to
|
||||
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
|
||||
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done(boolean)} if the
|
||||
* state was successfully changed.
|
||||
*
|
||||
* @param value the value that was the result of the task.
|
||||
|
@ -141,7 +141,7 @@ public abstract class BaseFuture<V> implements Future<V> {
|
|||
protected boolean set(@Nullable V value) {
|
||||
boolean result = sync.set(value);
|
||||
if (result) {
|
||||
done();
|
||||
done(true);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ public abstract class BaseFuture<V> implements Future<V> {
|
|||
/**
|
||||
* Subclasses should invoke this method to set the result of the computation
|
||||
* to an error, {@code throwable}. This will set the state of the future to
|
||||
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
|
||||
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done(boolean)} if the
|
||||
* state was successfully changed.
|
||||
*
|
||||
* @param throwable the exception that the task failed with.
|
||||
|
@ -159,7 +159,7 @@ public abstract class BaseFuture<V> implements Future<V> {
|
|||
protected boolean setException(Throwable throwable) {
|
||||
boolean result = sync.setException(Objects.requireNonNull(throwable));
|
||||
if (result) {
|
||||
done();
|
||||
done(false);
|
||||
}
|
||||
|
||||
// If it's an Error, we want to make sure it reaches the top of the
|
||||
|
@ -173,7 +173,14 @@ public abstract class BaseFuture<V> implements Future<V> {
|
|||
return result;
|
||||
}
|
||||
|
||||
protected void done() {
|
||||
/**
|
||||
* Called when the {@link BaseFuture} is completed. The {@code success} boolean indicates if the {@link BaseFuture} was successfully
|
||||
* completed (the value is {@code true}). In the cases the {@link BaseFuture} was completed with an error or cancelled the
|
||||
* value is {@code false}.
|
||||
*
|
||||
* @param success indicates if the {@link BaseFuture} was completed with success (true); in other cases it equals to false
|
||||
*/
|
||||
protected void done(boolean success) {
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -93,7 +93,7 @@ public final class ListenableFuture<V> extends BaseFuture<V> implements ActionLi
|
|||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void done() {
|
||||
protected synchronized void done(boolean ignored) {
|
||||
done = true;
|
||||
listeners.forEach(t -> notifyListener(t.v1(), t.v2()));
|
||||
// release references to any listeners as we no longer need them and will live
|
||||
|
|
|
@ -274,8 +274,8 @@ public class CacheFile {
|
|||
}
|
||||
ensureOpen();
|
||||
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
|
||||
start,
|
||||
end,
|
||||
Tuple.tuple(start, end),
|
||||
Tuple.tuple(start, end), // TODO use progressive sub range to trigger read operations sooner
|
||||
ActionListener.wrap(
|
||||
rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
|
||||
rangeFailure -> future.completeExceptionally(rangeFailure)
|
||||
|
@ -286,7 +286,8 @@ public class CacheFile {
|
|||
try {
|
||||
ensureOpen();
|
||||
onRangeMissing.accept(gap.start, gap.end);
|
||||
gap.onResponse(null);
|
||||
gap.onProgress(gap.end); // TODO update progress in onRangeMissing
|
||||
gap.onCompletion();
|
||||
} catch (Exception e) {
|
||||
gap.onFailure(e);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.store.cache;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.AdapterActionFuture;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* An {@link ActionFuture} that listeners can be attached to. Listeners are executed when the future is completed
|
||||
* or when a given progress is reached. Progression is updated using the {@link #onProgress(long)} method.
|
||||
*
|
||||
* Listeners are executed within the thread that triggers the completion, the failure or the progress update and
|
||||
* the progress value passed to the listeners on execution is the last updated value.
|
||||
*/
|
||||
class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> {
|
||||
|
||||
protected final long start;
|
||||
protected final long end;
|
||||
|
||||
// modified under 'this' mutex
|
||||
private volatile List<Tuple<Long, ActionListener<Long>>> listeners;
|
||||
protected volatile long progress;
|
||||
private volatile boolean completed;
|
||||
|
||||
/**
|
||||
* Creates a {@link ProgressListenableActionFuture} that accepts the progression
|
||||
* to be within {@code start} (inclusive) and {@code end} (exclusive) values.
|
||||
*
|
||||
* @param start the start (inclusive)
|
||||
* @param end the end (exclusive)
|
||||
*/
|
||||
ProgressListenableActionFuture(long start, long end) {
|
||||
super();
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
this.progress = start;
|
||||
this.completed = false;
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
private boolean invariant() {
|
||||
assert start < end : start + " < " + end;
|
||||
synchronized (this) {
|
||||
assert completed == false || listeners == null;
|
||||
assert start <= progress : start + " <= " + progress;
|
||||
assert progress <= end : progress + " <= " + end;
|
||||
assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the progress of the current {@link ActionFuture} with the given value, indicating that the range from {@code start}
|
||||
* (inclusive) to {@code progress} (exclusive) is available. Calling this method potentially triggers the execution of one or
|
||||
* more listeners that are waiting for the progress to reach a value lower than the one just updated.
|
||||
*
|
||||
* @param progress the new progress value
|
||||
*/
|
||||
public void onProgress(final long progress) {
|
||||
ensureNotCompleted();
|
||||
|
||||
if (progress <= start) {
|
||||
assert false : progress + " <= " + start;
|
||||
throw new IllegalArgumentException("Cannot update progress with a value less than [start=" + start + ']');
|
||||
}
|
||||
if (end < progress) {
|
||||
assert false : end + " < " + progress;
|
||||
throw new IllegalArgumentException("Cannot update progress with a value greater than [end=" + end + ']');
|
||||
}
|
||||
|
||||
List<ActionListener<Long>> listenersToExecute = null;
|
||||
synchronized (this) {
|
||||
assert this.progress < progress : this.progress + " < " + progress;
|
||||
this.progress = progress;
|
||||
|
||||
final List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
|
||||
if (listeners != null) {
|
||||
List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null;
|
||||
for (Tuple<Long, ActionListener<Long>> listener : listeners) {
|
||||
if (progress < listener.v1()) {
|
||||
if (listenersToKeep == null) {
|
||||
listenersToKeep = new ArrayList<>();
|
||||
}
|
||||
listenersToKeep.add(listener);
|
||||
} else {
|
||||
if (listenersToExecute == null) {
|
||||
listenersToExecute = new ArrayList<>();
|
||||
}
|
||||
listenersToExecute.add(listener.v2());
|
||||
}
|
||||
}
|
||||
this.listeners = listenersToKeep;
|
||||
}
|
||||
}
|
||||
if (listenersToExecute != null) {
|
||||
listenersToExecute.forEach(listener -> executeListener(listener, () -> progress));
|
||||
}
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(Long result) {
|
||||
ensureNotCompleted();
|
||||
super.onResponse(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
ensureNotCompleted();
|
||||
super.onFailure(e);
|
||||
}
|
||||
|
||||
private void ensureNotCompleted() {
|
||||
if (completed) {
|
||||
throw new IllegalStateException("Future is already completed");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void done(boolean success) {
|
||||
super.done(success);
|
||||
final List<Tuple<Long, ActionListener<Long>>> listenersToExecute;
|
||||
synchronized (this) {
|
||||
assert progress == end || success == false;
|
||||
completed = true;
|
||||
listenersToExecute = this.listeners;
|
||||
listeners = null;
|
||||
}
|
||||
if (listenersToExecute != null) {
|
||||
listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, () -> actionGet(0L)));
|
||||
}
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attach a {@link ActionListener} to the current future. The listener will be executed once the future is completed or once the
|
||||
* progress reaches the given {@code value}, whichever comes first.
|
||||
*
|
||||
* @param listener the {@link ActionListener} to add
|
||||
* @param value the value
|
||||
*/
|
||||
public void addListener(ActionListener<Long> listener, long value) {
|
||||
boolean executeImmediate = false;
|
||||
final long progress;
|
||||
synchronized (this) {
|
||||
progress = this.progress;
|
||||
if (completed || value <= progress) {
|
||||
executeImmediate = true;
|
||||
} else {
|
||||
List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
|
||||
if (listeners == null) {
|
||||
listeners = new ArrayList<>();
|
||||
}
|
||||
listeners.add(Tuple.tuple(value, listener));
|
||||
this.listeners = listeners;
|
||||
}
|
||||
}
|
||||
if (executeImmediate) {
|
||||
executeListener(listener, completed ? () -> actionGet(0L) : () -> progress);
|
||||
}
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
private void executeListener(final ActionListener<Long> listener, final Supplier<Long> result) {
|
||||
try {
|
||||
listener.onResponse(result.get());
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long convert(Long response) {
|
||||
if (response == null || response < start || end < response) {
|
||||
assert false : start + " < " + response + " < " + end;
|
||||
throw new IllegalArgumentException("Invalid completion value [start=" + start + ",end=" + end + ",response=" + response + ']');
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProgressListenableActionFuture[start="
|
||||
+ start
|
||||
+ ", end="
|
||||
+ end
|
||||
+ ", progress="
|
||||
+ progress
|
||||
+ ", completed="
|
||||
+ completed
|
||||
+ ", listeners="
|
||||
+ (listeners != null ? listeners.size() : 0)
|
||||
+ ']';
|
||||
}
|
||||
}
|
|
@ -7,7 +7,6 @@ package org.elasticsearch.index.store.cache;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.action.support.PlainListenableActionFuture;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
||||
|
@ -17,6 +16,7 @@ import java.util.Comparator;
|
|||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Keeps track of the contents of a file that may not be completely present.
|
||||
|
@ -57,19 +57,44 @@ public class SparseFileTracker {
|
|||
}
|
||||
|
||||
/**
|
||||
* Called before reading a range from the file to ensure that this range is present. Returns a list of gaps for the caller to fill.
|
||||
* Called before reading a range from the file to ensure that this range is present. Returns a list of gaps for the caller to fill. The
|
||||
* range from the file is defined by {@code range} but the listener is executed as soon as a (potentially smaller) sub range
|
||||
* {@code subRange} becomes available.
|
||||
*
|
||||
* @param start The (inclusive) start of the desired range
|
||||
* @param end The (exclusive) end of the desired range
|
||||
* @param listener Listener for when this range is fully available
|
||||
* @param range A tuple that contains the (inclusive) start and (exclusive) end of the desired range
|
||||
* @param subRange A tuple that contains the (inclusive) start and (exclusive) end of the listener's range
|
||||
* @param listener Listener for when the listening range is fully available
|
||||
* @return A collection of gaps that the client should fill in to satisfy this range
|
||||
* @throws IllegalArgumentException if invalid range is requested
|
||||
*/
|
||||
public List<Gap> waitForRange(final long start, final long end, final ActionListener<Void> listener) {
|
||||
public List<Gap> waitForRange(final Tuple<Long, Long> range, final Tuple<Long, Long> subRange, final ActionListener<Void> listener) {
|
||||
final long start = range.v1();
|
||||
final long end = range.v2();
|
||||
if (end < start || start < 0L || length < end) {
|
||||
throw new IllegalArgumentException("invalid range [start=" + start + ", end=" + end + ", length=" + length + "]");
|
||||
}
|
||||
|
||||
if (subRange.v2() < subRange.v1() || subRange.v1() < 0L || length < subRange.v2()) {
|
||||
throw new IllegalArgumentException(
|
||||
"invalid range to listen to [start=" + subRange.v1() + ", end=" + subRange.v2() + ", length=" + length + "]"
|
||||
);
|
||||
}
|
||||
if (subRange.v1() < start || end < subRange.v2()) {
|
||||
throw new IllegalArgumentException(
|
||||
"unable to listen to range [start="
|
||||
+ subRange.v1()
|
||||
+ ", end="
|
||||
+ subRange.v2()
|
||||
+ "] when range is [start="
|
||||
+ start
|
||||
+ ", end="
|
||||
+ end
|
||||
+ ", length="
|
||||
+ length
|
||||
+ "]"
|
||||
);
|
||||
}
|
||||
|
||||
final List<Gap> gaps = new ArrayList<>();
|
||||
synchronized (mutex) {
|
||||
assert invariant();
|
||||
|
@ -94,7 +119,11 @@ public class SparseFileTracker {
|
|||
|
||||
final SortedSet<Range> existingRanges = ranges.tailSet(targetRange);
|
||||
if (existingRanges.isEmpty()) {
|
||||
final Range newPendingRange = new Range(targetRange.start, end, PlainListenableActionFuture.newListenableFuture());
|
||||
final Range newPendingRange = new Range(
|
||||
targetRange.start,
|
||||
end,
|
||||
new ProgressListenableActionFuture(targetRange.start, end)
|
||||
);
|
||||
ranges.add(newPendingRange);
|
||||
pendingRanges.add(newPendingRange);
|
||||
gaps.add(new Gap(targetRange.start, end));
|
||||
|
@ -109,12 +138,12 @@ public class SparseFileTracker {
|
|||
}
|
||||
targetRange.start = Math.min(end, firstExistingRange.end);
|
||||
} else {
|
||||
final long newPendingRangeEnd = Math.min(end, firstExistingRange.start);
|
||||
final Range newPendingRange = new Range(
|
||||
targetRange.start,
|
||||
Math.min(end, firstExistingRange.start),
|
||||
PlainListenableActionFuture.newListenableFuture()
|
||||
newPendingRangeEnd,
|
||||
new ProgressListenableActionFuture(targetRange.start, newPendingRangeEnd)
|
||||
);
|
||||
|
||||
ranges.add(newPendingRange);
|
||||
pendingRanges.add(newPendingRange);
|
||||
gaps.add(new Gap(targetRange.start, newPendingRange.end));
|
||||
|
@ -129,16 +158,40 @@ public class SparseFileTracker {
|
|||
if (pendingRanges.isEmpty() == false) {
|
||||
assert ranges.containsAll(pendingRanges) : ranges + " vs " + pendingRanges;
|
||||
assert pendingRanges.stream().allMatch(Range::isPending) : pendingRanges;
|
||||
assert pendingRanges.size() != 1 || gaps.size() <= 1 : gaps;
|
||||
|
||||
if (pendingRanges.size() == 1) {
|
||||
assert gaps.size() <= 1 : gaps;
|
||||
pendingRanges.get(0).completionListener.addListener(listener);
|
||||
} else {
|
||||
final GroupedActionListener<Void> groupedActionListener = new GroupedActionListener<>(
|
||||
ActionListener.map(listener, ignored -> null),
|
||||
pendingRanges.size()
|
||||
);
|
||||
pendingRanges.forEach(pendingRange -> pendingRange.completionListener.addListener(groupedActionListener));
|
||||
// Pending ranges that needs to be filled before executing the listener
|
||||
final List<Range> requiredRanges = (start == subRange.v1() && end == subRange.v2())
|
||||
? pendingRanges
|
||||
: pendingRanges.stream()
|
||||
.filter(pendingRange -> pendingRange.start < subRange.v2())
|
||||
.filter(pendingRange -> subRange.v1() < pendingRange.end)
|
||||
.sorted(Comparator.comparingLong(r -> r.start))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
switch (requiredRanges.size()) {
|
||||
case 0:
|
||||
// no need to wait for the gaps to be filled, the listener can be executed immediately
|
||||
listener.onResponse(null);
|
||||
break;
|
||||
case 1:
|
||||
final Range requiredRange = requiredRanges.get(0);
|
||||
requiredRange.completionListener.addListener(
|
||||
ActionListener.map(listener, progress -> null),
|
||||
Math.min(requiredRange.end, subRange != null ? subRange.v2() : Long.MAX_VALUE)
|
||||
);
|
||||
break;
|
||||
default:
|
||||
final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
|
||||
ActionListener.map(listener, progress -> null),
|
||||
requiredRanges.size()
|
||||
);
|
||||
requiredRanges.forEach(
|
||||
r -> r.completionListener.addListener(
|
||||
groupedActionListener,
|
||||
Math.min(r.end, subRange != null ? subRange.v2() : Long.MAX_VALUE)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return Collections.unmodifiableList(gaps);
|
||||
|
@ -203,7 +256,7 @@ public class SparseFileTracker {
|
|||
}
|
||||
|
||||
private void onGapSuccess(final long start, final long end) {
|
||||
final PlainListenableActionFuture<Void> completionListener;
|
||||
final ProgressListenableActionFuture completionListener;
|
||||
|
||||
synchronized (mutex) {
|
||||
assert invariant();
|
||||
|
@ -249,11 +302,30 @@ public class SparseFileTracker {
|
|||
assert invariant();
|
||||
}
|
||||
|
||||
completionListener.onResponse(null);
|
||||
completionListener.onResponse(end);
|
||||
}
|
||||
|
||||
private void onGapProgress(long start, long end, long value) {
|
||||
final ProgressListenableActionFuture completionListener;
|
||||
|
||||
synchronized (mutex) {
|
||||
assert invariant();
|
||||
|
||||
final Range range = new Range(start, end, null);
|
||||
final SortedSet<Range> existingRanges = ranges.tailSet(range);
|
||||
assert existingRanges.isEmpty() == false;
|
||||
|
||||
final Range existingRange = existingRanges.first();
|
||||
assert existingRange.start == start && existingRange.end == end && existingRange.isPending();
|
||||
completionListener = existingRange.completionListener;
|
||||
assert invariant();
|
||||
}
|
||||
|
||||
completionListener.onProgress(value);
|
||||
}
|
||||
|
||||
private void onGapFailure(long start, long end, Exception e) {
|
||||
final PlainListenableActionFuture<Void> completionListener;
|
||||
final ProgressListenableActionFuture completionListener;
|
||||
|
||||
synchronized (mutex) {
|
||||
assert invariant();
|
||||
|
@ -313,7 +385,7 @@ public class SparseFileTracker {
|
|||
/**
|
||||
* Represents a gap in the file that a client should fill in.
|
||||
*/
|
||||
public class Gap implements ActionListener<Void> {
|
||||
public class Gap {
|
||||
/**
|
||||
* Inclusive start point of this range
|
||||
*/
|
||||
|
@ -330,12 +402,14 @@ public class SparseFileTracker {
|
|||
this.end = end;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
public void onCompletion() {
|
||||
onGapSuccess(start, end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onProgress(long value) {
|
||||
onGapProgress(start, end, value);
|
||||
}
|
||||
|
||||
public void onFailure(Exception e) {
|
||||
onGapFailure(start, end, e);
|
||||
}
|
||||
|
@ -358,9 +432,9 @@ public class SparseFileTracker {
|
|||
long end;
|
||||
|
||||
@Nullable // if not pending
|
||||
final PlainListenableActionFuture<Void> completionListener;
|
||||
final ProgressListenableActionFuture completionListener;
|
||||
|
||||
Range(long start, long end, PlainListenableActionFuture<Void> completionListener) {
|
||||
Range(long start, long end, @Nullable ProgressListenableActionFuture completionListener) {
|
||||
assert start <= end : start + "-" + end;
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
|
|
|
@ -0,0 +1,243 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.store.cache;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class ProgressListenableActionFutureTests extends ESTestCase {
|
||||
|
||||
public void testOnResponseCallsListeners() {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
|
||||
final AtomicArray<Long> listenersResponses = new AtomicArray<>(between(0, 50));
|
||||
for (int i = 0; i < listenersResponses.length(); i++) {
|
||||
final int listenerIndex = i;
|
||||
future.addListener(
|
||||
ActionListener.wrap(
|
||||
progress -> listenersResponses.setOnce(listenerIndex, progress),
|
||||
e -> listenersResponses.setOnce(listenerIndex, null)
|
||||
),
|
||||
randomLongBetween(future.start + 1L, future.end) // +1 to avoid immediate execution
|
||||
);
|
||||
}
|
||||
assertTrue(listenersResponses.asList().stream().allMatch(Objects::isNull));
|
||||
future.onProgress(future.end);
|
||||
future.onResponse(future.end);
|
||||
assertTrue(listenersResponses.asList().stream().allMatch(value -> value <= future.end));
|
||||
|
||||
final IllegalStateException ise = expectThrows(IllegalStateException.class, () -> future.onResponse(future.end));
|
||||
assertThat(ise.getMessage(), containsString("Future is already completed"));
|
||||
}
|
||||
|
||||
public void testOnFailureCallsListeners() {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
|
||||
final AtomicArray<Exception> listenersResponses = new AtomicArray<>(between(0, 50));
|
||||
for (int i = 0; i < listenersResponses.length(); i++) {
|
||||
final int listenerIndex = i;
|
||||
future.addListener(
|
||||
ActionListener.wrap(
|
||||
o -> listenersResponses.setOnce(listenerIndex, null),
|
||||
e -> listenersResponses.setOnce(listenerIndex, e)
|
||||
),
|
||||
randomLongBetween(future.start + 1L, future.end) // +1 to avoid immediate execution
|
||||
);
|
||||
}
|
||||
assertTrue(listenersResponses.asList().stream().allMatch(Objects::isNull));
|
||||
|
||||
final Exception exception = new ElasticsearchException("simulated");
|
||||
future.onFailure(exception);
|
||||
|
||||
for (int i = 0; i < listenersResponses.length(); i++) {
|
||||
assertThat(listenersResponses.get(i), sameInstance(exception));
|
||||
}
|
||||
|
||||
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> future.onFailure(exception));
|
||||
assertThat(ise.getMessage(), containsString("Future is already completed"));
|
||||
}
|
||||
|
||||
public void testProgressUpdatesCallsListeners() throws Exception {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
|
||||
final Thread[] threads = new Thread[between(1, 5)];
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i] = new Thread(() -> {
|
||||
try {
|
||||
startLatch.await();
|
||||
while (future.isDone() == false) {
|
||||
final long expectedProgress = randomLongBetween(future.start, future.end);
|
||||
final PlainActionFuture<Long> listener = new PlainActionFuture<>();
|
||||
future.addListener(ActionListener.wrap(listener::onResponse, listener::onFailure), expectedProgress);
|
||||
assertThat(listener.get(), greaterThanOrEqualTo(expectedProgress));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.error("Failed to wait for progress to be reached", t);
|
||||
if (future.isDone() == false) {
|
||||
future.onFailure(
|
||||
new Exception("Failed to update progress [" + t.getClass().getName() + ':' + t.getMessage() + "]")
|
||||
);
|
||||
}
|
||||
throw new AssertionError(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
|
||||
final Thread progressUpdaterThread = new Thread(() -> {
|
||||
try {
|
||||
startLatch.await();
|
||||
long progress = future.start;
|
||||
while (progress < future.end) {
|
||||
progress = randomLongBetween(progress + 1L, future.end);
|
||||
future.onProgress(progress);
|
||||
}
|
||||
future.onResponse(future.end);
|
||||
} catch (Throwable t) {
|
||||
logger.error("Failed to update progress", t);
|
||||
if (future.isDone() == false) {
|
||||
future.onFailure(new Exception("Failed to update progress [" + t.getClass().getName() + ':' + t.getMessage() + "]"));
|
||||
}
|
||||
throw new AssertionError(t);
|
||||
}
|
||||
});
|
||||
progressUpdaterThread.start();
|
||||
|
||||
startLatch.countDown();
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
progressUpdaterThread.join();
|
||||
assertTrue(future.isDone());
|
||||
}
|
||||
|
||||
public void testPartialProgressionThenFailure() throws Exception {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
final long limit = randomLongBetween(future.start + 1L, future.end);
|
||||
|
||||
final Set<PlainActionFuture<Long>> completedListeners = new HashSet<>();
|
||||
for (long i = 0L; i < between(1, 10); i++) {
|
||||
final PlainActionFuture<Long> listener = new PlainActionFuture<>();
|
||||
future.addListener(ActionListener.wrap(listener::onResponse, listener::onFailure), randomLongBetween(future.start, limit));
|
||||
completedListeners.add(listener);
|
||||
}
|
||||
|
||||
final Set<PlainActionFuture<Long>> failedListeners = new HashSet<>();
|
||||
if (limit < future.end) {
|
||||
for (long i = 0L; i < between(1, 10); i++) {
|
||||
final PlainActionFuture<Long> listener = new PlainActionFuture<>();
|
||||
future.addListener(
|
||||
ActionListener.wrap(listener::onResponse, listener::onFailure),
|
||||
randomLongBetween(limit + 1L, future.end)
|
||||
);
|
||||
failedListeners.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
long progress = future.start;
|
||||
while (progress < limit) {
|
||||
progress = randomLongBetween(progress + 1L, limit);
|
||||
future.onProgress(progress);
|
||||
}
|
||||
|
||||
final ElasticsearchException exception = new ElasticsearchException("Failure at " + limit);
|
||||
future.onFailure(exception);
|
||||
assertTrue(future.isDone());
|
||||
|
||||
for (PlainActionFuture<Long> completedListener : completedListeners) {
|
||||
assertThat(completedListener.isDone(), is(true));
|
||||
assertThat(completedListener.actionGet(), lessThanOrEqualTo(limit));
|
||||
}
|
||||
|
||||
for (PlainActionFuture<Long> failedListener : failedListeners) {
|
||||
assertThat(failedListener.isDone(), is(true));
|
||||
assertThat(expectThrows(ElasticsearchException.class, failedListener::actionGet), sameInstance(exception));
|
||||
}
|
||||
}
|
||||
|
||||
public void testListenerCalledImmediatelyAfterResponse() throws Exception {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
future.onProgress(future.end);
|
||||
future.onResponse(future.end);
|
||||
assertTrue(future.isDone());
|
||||
|
||||
final SetOnce<Long> listenerResponse = new SetOnce<>();
|
||||
final SetOnce<Exception> listenerFailure = new SetOnce<>();
|
||||
|
||||
future.addListener(ActionListener.wrap(listenerResponse::set, listenerFailure::set), randomLongBetween(future.start, future.end));
|
||||
|
||||
assertThat(listenerResponse.get(), equalTo(future.get()));
|
||||
assertThat(listenerFailure.get(), nullValue());
|
||||
}
|
||||
|
||||
public void testListenerCalledImmediatelyAfterFailure() {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
|
||||
final Exception failure = new ElasticsearchException("simulated");
|
||||
future.onFailure(failure);
|
||||
assertTrue(future.isDone());
|
||||
|
||||
final SetOnce<Exception> listenerFailure = new SetOnce<>();
|
||||
final SetOnce<Long> listenerResponse = new SetOnce<>();
|
||||
|
||||
future.addListener(ActionListener.wrap(listenerResponse::set, listenerFailure::set), randomLongBetween(future.start, future.end));
|
||||
|
||||
assertThat(listenerFailure.get(), sameInstance(failure));
|
||||
assertThat(listenerResponse.get(), nullValue());
|
||||
}
|
||||
|
||||
public void testListenerCalledImmediatelyWhenProgressReached() {
|
||||
final ProgressListenableActionFuture future = randomFuture();
|
||||
final long progress = randomLongBetween(future.start, future.end);
|
||||
|
||||
final PlainActionFuture<Long> listenerResponse = PlainActionFuture.newFuture();
|
||||
if (randomBoolean()) {
|
||||
future.onProgress(progress);
|
||||
future.addListener(listenerResponse, randomLongBetween(future.start, progress));
|
||||
} else {
|
||||
future.addListener(listenerResponse, randomLongBetween(future.start, progress));
|
||||
future.onProgress(progress);
|
||||
}
|
||||
|
||||
assertThat(listenerResponse.isDone(), is(true));
|
||||
assertThat(listenerResponse.actionGet(), equalTo(progress));
|
||||
|
||||
future.onProgress(future.end);
|
||||
future.onResponse(future.end);
|
||||
assertThat(future.isDone(), is(true));
|
||||
}
|
||||
|
||||
private static ProgressListenableActionFuture randomFuture() {
|
||||
final long delta = randomLongBetween(1L, ByteSizeUnit.TB.toBytes(1L));
|
||||
final long start = randomLongBetween(Long.MIN_VALUE, Long.MAX_VALUE - delta);
|
||||
return new ProgressListenableActionFuture(start, start + delta);
|
||||
}
|
||||
}
|
|
@ -49,14 +49,14 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|||
|
||||
IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> sparseFileTracker.waitForRange(-1L, randomLongBetween(0L, length), listener)
|
||||
() -> sparseFileTracker.waitForRange(Tuple.tuple(-1L, randomLongBetween(0L, length)), null, listener)
|
||||
);
|
||||
assertThat("start must not be negative", e.getMessage(), containsString("invalid range"));
|
||||
assertThat(invoked.get(), is(false));
|
||||
|
||||
e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> sparseFileTracker.waitForRange(randomLongBetween(0L, Math.max(0L, length - 1L)), length + 1L, listener)
|
||||
() -> sparseFileTracker.waitForRange(Tuple.tuple(randomLongBetween(0L, Math.max(0L, length - 1L)), length + 1L), null, listener)
|
||||
);
|
||||
assertThat("end must not be greater than length", e.getMessage(), containsString("invalid range"));
|
||||
assertThat(invoked.get(), is(false));
|
||||
|
@ -65,10 +65,61 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|||
e = expectThrows(IllegalArgumentException.class, () -> {
|
||||
long start = randomLongBetween(1L, Math.max(1L, length - 1L));
|
||||
long end = randomLongBetween(0L, start - 1L);
|
||||
sparseFileTracker.waitForRange(start, end, listener);
|
||||
sparseFileTracker.waitForRange(Tuple.tuple(start, end), null, listener);
|
||||
});
|
||||
assertThat("end must not be greater than length", e.getMessage(), containsString("invalid range"));
|
||||
assertThat(invoked.get(), is(false));
|
||||
|
||||
final long start = randomLongBetween(0L, length - 1L);
|
||||
final long end = randomLongBetween(start + 1L, length);
|
||||
|
||||
if (start > 0L) {
|
||||
e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start - 1L, end), listener)
|
||||
);
|
||||
assertThat(
|
||||
"listener range start must not be smaller than range start",
|
||||
e.getMessage(),
|
||||
containsString("unable to listen to range")
|
||||
);
|
||||
assertThat(invoked.get(), is(false));
|
||||
} else {
|
||||
e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start - 1L, end), listener)
|
||||
);
|
||||
assertThat(
|
||||
"listener range start must not be smaller than zero",
|
||||
e.getMessage(),
|
||||
containsString("invalid range to listen to")
|
||||
);
|
||||
assertThat(invoked.get(), is(false));
|
||||
}
|
||||
|
||||
if (end < length) {
|
||||
e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start, end + 1L), listener)
|
||||
);
|
||||
assertThat(
|
||||
"listener range end must not be greater than range end",
|
||||
e.getMessage(),
|
||||
containsString("unable to listen to range")
|
||||
);
|
||||
assertThat(invoked.get(), is(false));
|
||||
} else {
|
||||
e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> sparseFileTracker.waitForRange(Tuple.tuple(start, end), Tuple.tuple(start, end + 1L), listener)
|
||||
);
|
||||
assertThat(
|
||||
"listener range end must not be greater than length",
|
||||
e.getMessage(),
|
||||
containsString("invalid range to listen to")
|
||||
);
|
||||
assertThat(invoked.get(), is(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,10 +143,11 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
final Tuple<Long, Long> range = Tuple.tuple(start, end);
|
||||
if (pending) {
|
||||
final AtomicBoolean expectNotification = new AtomicBoolean();
|
||||
final AtomicBoolean wasNotified = new AtomicBoolean();
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(start, end, ActionListener.wrap(ignored -> {
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(range, range, ActionListener.wrap(ignored -> {
|
||||
assertTrue(expectNotification.get());
|
||||
assertTrue(wasNotified.compareAndSet(false, true));
|
||||
}, e -> { throw new AssertionError(e); }));
|
||||
|
@ -103,23 +155,164 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|||
final SparseFileTracker.Gap gap = gaps.get(gapIndex);
|
||||
assertThat(gap.start, greaterThanOrEqualTo(start));
|
||||
assertThat(gap.end, lessThanOrEqualTo(end));
|
||||
// listener is notified when the last gap is completed
|
||||
final AtomicBoolean shouldNotifyListener = new AtomicBoolean();
|
||||
for (long i = gap.start; i < gap.end; i++) {
|
||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||
// listener is notified when the progress reached the last byte of the last gap
|
||||
if ((gapIndex == gaps.size() - 1) && (i == gap.end - 1L)) {
|
||||
assertTrue(shouldNotifyListener.compareAndSet(false, true));
|
||||
expectNotification.set(true);
|
||||
}
|
||||
gap.onProgress(i + 1L);
|
||||
assertThat(wasNotified.get(), equalTo(shouldNotifyListener.get()));
|
||||
}
|
||||
assertFalse(wasNotified.get());
|
||||
if (gapIndex == gaps.size() - 1) {
|
||||
expectNotification.set(true);
|
||||
}
|
||||
gap.onResponse(null);
|
||||
assertThat(wasNotified.get(), equalTo(shouldNotifyListener.get()));
|
||||
gap.onCompletion();
|
||||
}
|
||||
assertTrue(wasNotified.get());
|
||||
}
|
||||
|
||||
final AtomicBoolean wasNotified = new AtomicBoolean();
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
||||
start,
|
||||
end,
|
||||
range,
|
||||
range,
|
||||
ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); })
|
||||
);
|
||||
assertThat(gaps, empty());
|
||||
assertTrue(wasNotified.get());
|
||||
}
|
||||
|
||||
public void testCallsListenerWhenRangeIsAvailable() {
|
||||
final byte[] fileContents = new byte[between(0, 1000)];
|
||||
final SparseFileTracker sparseFileTracker = new SparseFileTracker("test", fileContents.length);
|
||||
|
||||
final Set<AtomicBoolean> listenersCalled = new HashSet<>();
|
||||
for (int i = between(0, 10); i > 0; i--) {
|
||||
waitForRandomRange(fileContents, sparseFileTracker, listenersCalled::add, gap -> processGap(fileContents, gap));
|
||||
assertTrue(listenersCalled.stream().allMatch(AtomicBoolean::get));
|
||||
}
|
||||
|
||||
final Tuple<Long, Long> range;
|
||||
{
|
||||
final long start = randomLongBetween(0L, Math.max(0L, fileContents.length - 1));
|
||||
range = Tuple.tuple(start, randomLongBetween(start, fileContents.length));
|
||||
}
|
||||
|
||||
final Tuple<Long, Long> subRange;
|
||||
{
|
||||
final long rangeLength = range.v2() - range.v1();
|
||||
if (rangeLength > 1L) {
|
||||
final long start = randomLongBetween(range.v1(), range.v2() - 1L);
|
||||
subRange = Tuple.tuple(start, randomLongBetween(start + 1L, range.v2()));
|
||||
} else {
|
||||
subRange = Tuple.tuple(range.v1(), range.v2());
|
||||
}
|
||||
}
|
||||
|
||||
boolean pending = false;
|
||||
for (long i = subRange.v1(); i < subRange.v2(); i++) {
|
||||
if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) {
|
||||
pending = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (pending == false) {
|
||||
final AtomicBoolean wasNotified = new AtomicBoolean();
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
||||
range,
|
||||
subRange,
|
||||
ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); })
|
||||
);
|
||||
|
||||
assertTrue(
|
||||
"All bytes of the sub range " + subRange + " are available, listener must be executed immediately",
|
||||
wasNotified.get()
|
||||
);
|
||||
|
||||
for (final SparseFileTracker.Gap gap : gaps) {
|
||||
assertThat(gap.start, greaterThanOrEqualTo(range.v1()));
|
||||
assertThat(gap.end, lessThanOrEqualTo(range.v2()));
|
||||
|
||||
for (long i = gap.start; i < gap.end; i++) {
|
||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||
assertTrue(wasNotified.get());
|
||||
gap.onProgress(i + 1L);
|
||||
}
|
||||
gap.onCompletion();
|
||||
}
|
||||
|
||||
} else {
|
||||
final AtomicBoolean wasNotified = new AtomicBoolean();
|
||||
final AtomicBoolean expectNotification = new AtomicBoolean();
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(range, subRange, ActionListener.wrap(ignored -> {
|
||||
assertTrue(expectNotification.get());
|
||||
assertTrue(wasNotified.compareAndSet(false, true));
|
||||
}, e -> { throw new AssertionError(e); }));
|
||||
|
||||
assertFalse("Listener should not have been executed yet", wasNotified.get());
|
||||
|
||||
long triggeringProgress = -1L;
|
||||
for (long i = subRange.v1(); i < subRange.v2(); i++) {
|
||||
if (fileContents[Math.toIntExact(i)] == UNAVAILABLE) {
|
||||
triggeringProgress = i;
|
||||
}
|
||||
}
|
||||
assertThat(triggeringProgress, greaterThanOrEqualTo(0L));
|
||||
|
||||
for (final SparseFileTracker.Gap gap : gaps) {
|
||||
assertThat(gap.start, greaterThanOrEqualTo(range.v1()));
|
||||
assertThat(gap.end, lessThanOrEqualTo(range.v2()));
|
||||
|
||||
for (long i = gap.start; i < gap.end; i++) {
|
||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(UNAVAILABLE));
|
||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||
if (triggeringProgress == i) {
|
||||
assertFalse(expectNotification.getAndSet(true));
|
||||
}
|
||||
assertThat(
|
||||
"Listener should not have been called before ["
|
||||
+ triggeringProgress
|
||||
+ "] is reached, but it was triggered after progress got updated to ["
|
||||
+ i
|
||||
+ ']',
|
||||
wasNotified.get(),
|
||||
equalTo(triggeringProgress < i)
|
||||
);
|
||||
|
||||
gap.onProgress(i + 1L);
|
||||
|
||||
assertThat(
|
||||
"Listener should not have been called before ["
|
||||
+ triggeringProgress
|
||||
+ "] is reached, but it was triggered after progress got updated to ["
|
||||
+ i
|
||||
+ ']',
|
||||
wasNotified.get(),
|
||||
equalTo(triggeringProgress < i + 1L)
|
||||
);
|
||||
}
|
||||
gap.onCompletion();
|
||||
|
||||
assertThat(
|
||||
"Listener should not have been called before ["
|
||||
+ triggeringProgress
|
||||
+ "] is reached, but it was triggered once gap ["
|
||||
+ gap
|
||||
+ "] was completed",
|
||||
wasNotified.get(),
|
||||
equalTo(triggeringProgress < gap.end)
|
||||
);
|
||||
}
|
||||
assertTrue(wasNotified.get());
|
||||
}
|
||||
|
||||
final AtomicBoolean wasNotified = new AtomicBoolean();
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
||||
range,
|
||||
subRange,
|
||||
ActionListener.wrap(ignored -> assertTrue(wasNotified.compareAndSet(false, true)), e -> { throw new AssertionError(e); })
|
||||
);
|
||||
assertThat(gaps, empty());
|
||||
|
@ -232,25 +425,33 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|||
Consumer<AtomicBoolean> listenerCalledConsumer,
|
||||
Consumer<SparseFileTracker.Gap> gapConsumer
|
||||
) {
|
||||
final long start = randomLongBetween(0L, Math.max(0L, fileContents.length - 1));
|
||||
final long end = randomLongBetween(start, fileContents.length);
|
||||
final long rangeStart = randomLongBetween(0L, Math.max(0L, fileContents.length - 1));
|
||||
final long rangeEnd = randomLongBetween(rangeStart, fileContents.length);
|
||||
final AtomicBoolean listenerCalled = new AtomicBoolean();
|
||||
listenerCalledConsumer.accept(listenerCalled);
|
||||
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(start, end, new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
for (long i = start; i < end; i++) {
|
||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
|
||||
}
|
||||
assertTrue(listenerCalled.compareAndSet(false, true));
|
||||
}
|
||||
final boolean useSubRange = randomBoolean();
|
||||
final long subRangeStart = useSubRange ? randomLongBetween(rangeStart, rangeEnd) : rangeStart;
|
||||
final long subRangeEnd = useSubRange ? randomLongBetween(subRangeStart, rangeEnd) : rangeEnd;
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertTrue(listenerCalled.compareAndSet(false, true));
|
||||
final List<SparseFileTracker.Gap> gaps = sparseFileTracker.waitForRange(
|
||||
Tuple.tuple(rangeStart, rangeEnd),
|
||||
Tuple.tuple(subRangeStart, subRangeEnd),
|
||||
new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
for (long i = subRangeStart; i < subRangeEnd; i++) {
|
||||
assertThat(fileContents[Math.toIntExact(i)], equalTo(AVAILABLE));
|
||||
}
|
||||
assertTrue(listenerCalled.compareAndSet(false, true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertTrue(listenerCalled.compareAndSet(false, true));
|
||||
}
|
||||
}
|
||||
});
|
||||
);
|
||||
|
||||
for (final SparseFileTracker.Gap gap : gaps) {
|
||||
for (long i = gap.start; i < gap.end; i++) {
|
||||
|
@ -270,8 +471,9 @@ public class SparseFileTrackerTests extends ESTestCase {
|
|||
} else {
|
||||
for (long i = gap.start; i < gap.end; i++) {
|
||||
fileContents[Math.toIntExact(i)] = AVAILABLE;
|
||||
gap.onProgress(i + 1L);
|
||||
}
|
||||
gap.onResponse(null);
|
||||
gap.onCompletion();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue