mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-23 13:26:02 +00:00
Reindex negative TimeValue fix (#54057)
Reindex would use timeValueNanos(System.nanoTime()). The intended use for TimeValue is as a duration, not as absolute time. In particular, this could result in negative TimeValue's, being unsupported in #53913. Modified to use the bare long nano-second value.
This commit is contained in:
parent
fc498f625a
commit
7ce7aff66e
@ -248,16 +248,16 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
|
||||
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
|
||||
// like a bug?
|
||||
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
|
||||
onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a scroll response.
|
||||
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
|
||||
* @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
|
||||
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
|
||||
* @param asyncResponse the response to process from ScrollableHitSource
|
||||
*/
|
||||
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
|
||||
void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
|
||||
ScrollableHitSource.Response response = asyncResponse.response();
|
||||
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
|
||||
if (task.isCancelled()) {
|
||||
@ -285,7 +285,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
|
||||
* waiting on the scroll doesn't count against this batch in the throttle.
|
||||
*/
|
||||
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
|
||||
prepareBulkRequest(System.nanoTime(), asyncResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -294,7 +294,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
}
|
||||
};
|
||||
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
|
||||
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
|
||||
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -302,7 +302,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
|
||||
* thread may be blocked by the user script.
|
||||
*/
|
||||
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
|
||||
void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) {
|
||||
ScrollableHitSource.Response response = asyncResponse.response();
|
||||
logger.debug("[{}]: preparing bulk request", task.getId());
|
||||
if (task.isCancelled()) {
|
||||
@ -328,12 +328,12 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
/*
|
||||
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
|
||||
*/
|
||||
notifyDone(thisBatchStartTime, asyncResponse, 0);
|
||||
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
|
||||
return;
|
||||
}
|
||||
request.timeout(mainRequest.getTimeout());
|
||||
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
|
||||
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
|
||||
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -419,14 +419,14 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
||||
}
|
||||
}
|
||||
|
||||
void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
|
||||
void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
|
||||
if (task.isCancelled()) {
|
||||
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
|
||||
finishHim(null);
|
||||
return;
|
||||
}
|
||||
this.lastBatchSize = batchSize;
|
||||
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
|
||||
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize));
|
||||
}
|
||||
|
||||
private void recordFailure(Failure failure, List<Failure> failures) {
|
||||
|
@ -112,7 +112,6 @@ import static java.util.Collections.synchronizedSet;
|
||||
import static org.apache.lucene.util.TestUtil.randomSimpleString;
|
||||
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
@ -256,7 +255,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
|
||||
long total = randomIntBetween(0, Integer.MAX_VALUE);
|
||||
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
|
||||
assertEquals(total, testTask.getStatus().getTotal());
|
||||
}
|
||||
|
||||
@ -269,7 +268,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
|
||||
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
|
||||
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
|
||||
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
|
||||
simulateScrollResponse(action, System.nanoTime(), 0, response);
|
||||
|
||||
// Use assert busy because the update happens on another thread
|
||||
final int expectedBatches = batches;
|
||||
@ -355,7 +354,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
}
|
||||
});
|
||||
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
|
||||
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
|
||||
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
|
||||
assertThat(e.getCause(), hasToString(containsString("test")));
|
||||
@ -373,7 +372,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
|
||||
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
|
||||
emptyList(), null);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
|
||||
BulkByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), contains(shardFailure));
|
||||
@ -387,7 +386,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
*/
|
||||
public void testSearchTimeoutsAbortRequest() throws Exception {
|
||||
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
|
||||
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
|
||||
BulkByScrollResponse response = listener.get();
|
||||
assertThat(response.getBulkFailures(), empty());
|
||||
assertThat(response.getSearchFailures(), empty());
|
||||
@ -424,7 +423,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
|
||||
hit.setSource(new BytesArray("{}"), XContentType.JSON);
|
||||
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
|
||||
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
|
||||
simulateScrollResponse(action, System.nanoTime(), 0, response);
|
||||
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
|
||||
assertThat(e.getCause(), instanceOf(RuntimeException.class));
|
||||
assertThat(e.getCause().getMessage(), equalTo("surprise"));
|
||||
@ -620,7 +619,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testCancelBeforeScrollResponse() throws Exception {
|
||||
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
|
||||
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
|
||||
new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
|
||||
}
|
||||
|
||||
@ -635,7 +634,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testCancelBeforeStartNextScroll() throws Exception {
|
||||
TimeValue now = timeValueNanos(System.nanoTime());
|
||||
long now = System.nanoTime();
|
||||
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
|
||||
}
|
||||
|
||||
@ -684,7 +683,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
|
||||
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
|
||||
worker.rethrottle(1);
|
||||
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
|
||||
simulateScrollResponse(action, System.nanoTime(), 1000, response);
|
||||
|
||||
// Now that we've got our cancel we'll just verify that it all came through all right
|
||||
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
|
||||
@ -713,7 +712,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
||||
/**
|
||||
* Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
|
||||
*/
|
||||
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
|
||||
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
|
||||
ScrollableHitSource.Response response) {
|
||||
action.setScroll(scrollId());
|
||||
action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() {
|
||||
|
@ -182,11 +182,11 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
|
||||
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
|
||||
* rescheduled over and over again.
|
||||
*/
|
||||
public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
|
||||
public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize,
|
||||
AbstractRunnable prepareBulkRequestRunnable) {
|
||||
// Synchronize so we are less likely to schedule the same request twice.
|
||||
synchronized (delayedPrepareBulkRequestReference) {
|
||||
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
|
||||
TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
|
||||
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
|
||||
try {
|
||||
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
|
||||
@ -197,8 +197,8 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
|
||||
}
|
||||
}
|
||||
|
||||
public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
|
||||
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
|
||||
public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
|
||||
long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize);
|
||||
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
|
||||
return timeValueNanos(waitTime);
|
||||
}
|
||||
|
@ -36,7 +36,6 @@ import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
||||
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
||||
import static org.hamcrest.Matchers.both;
|
||||
import static org.hamcrest.Matchers.closeTo;
|
||||
@ -152,7 +151,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
|
||||
}
|
||||
};
|
||||
try {
|
||||
workerState.delayPrepareBulkRequest(threadPool, timeValueNanos(System.nanoTime()), batchSizeForMaxDelay,
|
||||
workerState.delayPrepareBulkRequest(threadPool, System.nanoTime(), batchSizeForMaxDelay,
|
||||
new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
@ -225,7 +224,7 @@ public class WorkerBulkByScrollTaskStateTests extends ESTestCase {
|
||||
};
|
||||
try {
|
||||
// Have the task use the thread pool to delay a task that does nothing
|
||||
workerState.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), 1, new AbstractRunnable() {
|
||||
workerState.delayPrepareBulkRequest(threadPool, 0, 1, new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user