diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 67c48c38791..d5d0d7f3e97 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -124,12 +125,12 @@ final class IndexShardOperationPermits implements Closeable { delayOperations(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { - final AtomicBoolean released = new AtomicBoolean(false); + final RunOnce released = new RunOnce(() -> releaseDelayedOperations()); @Override public void onFailure(final Exception e) { try { - releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible + released.run(); // resume delayed operations as soon as possible } finally { onAcquired.onFailure(e); } @@ -142,16 +143,10 @@ final class IndexShardOperationPermits implements Closeable { try { releasable.close(); } finally { - releaseDelayedOperationsIfNeeded(); + released.run(); } }); } - - private void releaseDelayedOperationsIfNeeded() { - if (released.compareAndSet(false, true)) { - releaseDelayedOperations(); - } - } }); } @@ -173,13 +168,11 @@ final class IndexShardOperationPermits implements Closeable { } } if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - final AtomicBoolean closed = new AtomicBoolean(); - return () -> { - if (closed.compareAndSet(false, true)) { - assert semaphore.availablePermits() == 0; - semaphore.release(TOTAL_PERMITS); - } - }; + final RunOnce release = new RunOnce(() -> { + assert semaphore.availablePermits() == 0; + semaphore.release(TOTAL_PERMITS); + }); + return release::run; } else { throw new TimeoutException("timeout while blocking operations"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 3fc4d030da0..3a86ed30dc5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -20,7 +20,6 @@ package org.elasticsearch.test.transport; import com.carrotsearch.randomizedtesting.SysGlobals; -import java.util.concurrent.TimeUnit; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterModule; @@ -38,6 +37,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; @@ -67,7 +67,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -349,9 +349,7 @@ public final class MockTransportService extends TransportService { request.writeTo(bStream); final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); - Runnable runnable = new AbstractRunnable() { - AtomicBoolean requestSent = new AtomicBoolean(); - + final RunOnce runnable = new RunOnce(new AbstractRunnable() { @Override public void onFailure(Exception e) { logger.debug("failed to send delayed request", e); @@ -359,11 +357,9 @@ public final class MockTransportService extends TransportService { @Override protected void doRun() throws IOException { - if (requestSent.compareAndSet(false, true)) { - connection.sendRequest(requestId, action, clonedRequest, options); - } + connection.sendRequest(requestId, action, clonedRequest, options); } - }; + }); // store the request to send it once the rule is cleared. synchronized (this) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 1340556fbcd..0028bfef928 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect.output; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import java.time.Duration; @@ -14,16 +15,21 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; class FlushListener { final ConcurrentMap awaitingFlushed = new ConcurrentHashMap<>(); - final AtomicBoolean cleared = new AtomicBoolean(false); + final RunOnce onClear = new RunOnce(() -> { + Iterator> latches = awaitingFlushed.entrySet().iterator(); + while (latches.hasNext()) { + latches.next().getValue().latch.countDown(); + latches.remove(); + } + }); @Nullable FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException { - if (cleared.get()) { + if (onClear.hasRun()) { return null; } @@ -49,13 +55,7 @@ class FlushListener { } void clear() { - if (cleared.compareAndSet(false, true)) { - Iterator> latches = awaitingFlushed.entrySet().iterator(); - while (latches.hasNext()) { - latches.next().getValue().latch.countDown(); - latches.remove(); - } - } + onClear.run(); } private static class FlushAcknowledgementHolder { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java index 3bcedb52392..3343882d581 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java @@ -60,7 +60,7 @@ public class FlushListenerTests extends ESTestCase { } assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size())); assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false)); - assertFalse(listener.cleared.get()); + assertFalse(listener.onClear.hasRun()); listener.clear(); @@ -68,6 +68,6 @@ public class FlushListenerTests extends ESTestCase { assertBusy(() -> assertNotNull(f.get())); } assertTrue(listener.awaitingFlushed.isEmpty()); - assertTrue(listener.cleared.get()); + assertTrue(listener.onClear.hasRun()); } }