diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 7efd1ee5d6e..b4199293f79 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -35,8 +35,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; -import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -65,6 +63,8 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.reindex.ScrollableHitSource.Hit; +import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -323,8 +323,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase { // While we're here we can check that the sleep made it through assertThat(delay.nanos(), greaterThan(0L)); assertThat(delay.seconds(), lessThanOrEqualTo(10L)); - ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test")); - return null; + final EsRejectedExecutionException exception = new EsRejectedExecutionException("test"); + if (command instanceof AbstractRunnable) { + ((AbstractRunnable) command).onRejection(exception); + return null; + } else { + throw exception; + } } }); ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java new file mode 100644 index 00000000000..300413ac44c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Runnable that can only be run one time. + */ +public class RunOnce implements Runnable { + + private final Runnable delegate; + private final AtomicBoolean hasRun; + + public RunOnce(final Runnable delegate) { + this.delegate = Objects.requireNonNull(delegate); + this.hasRun = new AtomicBoolean(false); + } + + @Override + public void run() { + if (hasRun.compareAndSet(false, true)) { + delegate.run(); + } + } + + /** + * {@code true} if the {@link RunOnce} has been executed once. + */ + public boolean hasRun() { + return hasRun.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java index 797d6227561..17bf59a104a 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java @@ -23,12 +23,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -188,8 +189,12 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed { synchronized (delayedPrepareBulkRequestReference) { TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize); logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay); - delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), - delay, new RunOnce(prepareBulkRequestRunnable))); + try { + delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), + delay, new RunOnce(prepareBulkRequestRunnable))); + } catch (EsRejectedExecutionException e) { + prepareBulkRequestRunnable.onRejection(e); + } } } @@ -242,25 +247,17 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed { class DelayedPrepareBulkRequest { private final ThreadPool threadPool; - private final AbstractRunnable command; + private final Runnable command; private final float requestsPerSecond; private final ScheduledFuture future; - DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, AbstractRunnable command) { + DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) { this.threadPool = threadPool; this.requestsPerSecond = requestsPerSecond; this.command = command; - this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - throttledNanos.addAndGet(delay.nanos()); - command.run(); - } - - @Override - public void onFailure(Exception e) { - command.onFailure(e); - } + this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> { + throttledNanos.addAndGet(delay.nanos()); + command.run(); }); } @@ -302,29 +299,4 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed { return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond)); } } - - /** - * Runnable that can only be run one time. This is paranoia to prevent furiously rethrottling from running the command multiple times. - * Without it the command would be run multiple times. - */ - private static class RunOnce extends AbstractRunnable { - private final AtomicBoolean hasRun = new AtomicBoolean(false); - private final AbstractRunnable delegate; - - RunOnce(AbstractRunnable delegate) { - this.delegate = delegate; - } - - @Override - protected void doRun() throws Exception { - if (hasRun.compareAndSet(false, true)) { - delegate.run(); - } - } - - @Override - public void onFailure(Exception e) { - delegate.onFailure(e); - } - } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java new file mode 100644 index 00000000000..e833edc9d56 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class RunOnceTests extends ESTestCase { + + public void testRunOnce() { + final AtomicInteger counter = new AtomicInteger(0); + final RunOnce runOnce = new RunOnce(counter::incrementAndGet); + assertFalse(runOnce.hasRun()); + + runOnce.run(); + assertTrue(runOnce.hasRun()); + assertEquals(1, counter.get()); + + runOnce.run(); + assertTrue(runOnce.hasRun()); + assertEquals(1, counter.get()); + } + + public void testRunOnceConcurrently() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(0); + final RunOnce runOnce = new RunOnce(counter::incrementAndGet); + + final Thread[] threads = new Thread[between(3, 10)]; + final CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + runOnce.run(); + }); + threads[i].start(); + } + + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + assertTrue(runOnce.hasRun()); + assertEquals(1, counter.get()); + } + + public void testRunOnceWithAbstractRunnable() { + final AtomicInteger onRun = new AtomicInteger(0); + final AtomicInteger onFailure = new AtomicInteger(0); + final AtomicInteger onAfter = new AtomicInteger(0); + + final RunOnce runOnce = new RunOnce(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + onRun.incrementAndGet(); + throw new RuntimeException("failure"); + } + + @Override + public void onFailure(Exception e) { + onFailure.incrementAndGet(); + } + + @Override + public void onAfter() { + onAfter.incrementAndGet(); + } + }); + + final int iterations = randomIntBetween(1, 10); + for (int i = 0; i < iterations; i++) { + runOnce.run(); + assertEquals(1, onRun.get()); + assertEquals(1, onFailure.get()); + assertEquals(1, onAfter.get()); + assertTrue(runOnce.hasRun()); + } + } +}