From d3b130606932f8664e928f67c032c2fb796547e7 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 15 Apr 2016 16:51:50 -0400 Subject: [PATCH] Reindex: never report negative throttled_until Just clamp the value at 0. It isn't useful to tell the user "this thread should have woken 5ms ago". Closes #17783 --- .../index/reindex/BulkByScrollTask.java | 3 +- .../index/reindex/BulkByScrollTaskTests.java | 63 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 7149dbb2094..42798d5a34d 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static java.lang.Math.max; import static java.lang.Math.round; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; @@ -93,7 +94,7 @@ public class BulkByScrollTask extends CancellableTask { if (delayed.future == null) { return timeValueNanos(0); } - return timeValueNanos(delayed.future.getDelay(TimeUnit.NANOSECONDS)); + return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); } /** diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java index 77472906b4a..e30c55b8096 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java @@ -29,8 +29,11 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; @@ -206,4 +209,64 @@ public class BulkByScrollTaskTests extends ESTestCase { } assertThat(errors, empty()); } + + public void testDelayNeverNegative() throws IOException { + // Thread pool that returns a ScheduledFuture that claims to have a negative delay + ThreadPool threadPool = new ThreadPool("test") { + public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + return new ScheduledFuture() { + @Override + public long getDelay(TimeUnit unit) { + return -1; + } + + @Override + public int compareTo(Delayed o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + }; + } + }; + try { + // Have the task use the thread pool to delay a task that does nothing + task.delayPrepareBulkRequest(threadPool, timeValueSeconds(0), new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + } + @Override + public void onFailure(Throwable t) { + throw new UnsupportedOperationException(); + } + }); + // Even though the future returns a negative delay we just return 0 because the time is up. + assertEquals(timeValueSeconds(0), task.getStatus().getThrottledUntil()); + } finally { + threadPool.shutdown(); + } + } }