Use RunOnce when appropriate (#35553)

This pull request replaces some blocks of code that must be run once 
and that are currently based on AtomicBoolean by the convient RunOnce 
class added in #35489.
This commit is contained in:
Tanguy Leroux 2018-11-15 09:24:40 +01:00 committed by GitHub
parent cb8bdeae68
commit c9b4ef0dfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 37 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -124,12 +125,12 @@ final class IndexShardOperationPermits implements Closeable {
delayOperations(); delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
final AtomicBoolean released = new AtomicBoolean(false); final RunOnce released = new RunOnce(() -> releaseDelayedOperations());
@Override @Override
public void onFailure(final Exception e) { public void onFailure(final Exception e) {
try { try {
releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible released.run(); // resume delayed operations as soon as possible
} finally { } finally {
onAcquired.onFailure(e); onAcquired.onFailure(e);
} }
@ -142,16 +143,10 @@ final class IndexShardOperationPermits implements Closeable {
try { try {
releasable.close(); releasable.close();
} finally { } finally {
releaseDelayedOperationsIfNeeded(); released.run();
} }
}); });
} }
private void releaseDelayedOperationsIfNeeded() {
if (released.compareAndSet(false, true)) {
releaseDelayedOperations();
}
}
}); });
} }
@ -173,13 +168,11 @@ final class IndexShardOperationPermits implements Closeable {
} }
} }
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
final AtomicBoolean closed = new AtomicBoolean(); final RunOnce release = new RunOnce(() -> {
return () -> { assert semaphore.availablePermits() == 0;
if (closed.compareAndSet(false, true)) { semaphore.release(TOTAL_PERMITS);
assert semaphore.availablePermits() == 0; });
semaphore.release(TOTAL_PERMITS); return release::run;
}
};
} else { } else {
throw new TimeoutException("timeout while blocking operations"); throw new TimeoutException("timeout while blocking operations");
} }

View File

@ -20,7 +20,6 @@
package org.elasticsearch.test.transport; package org.elasticsearch.test.transport;
import com.carrotsearch.randomizedtesting.SysGlobals; import com.carrotsearch.randomizedtesting.SysGlobals;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterModule;
@ -38,6 +37,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
@ -67,7 +67,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -349,9 +349,7 @@ public final class MockTransportService extends TransportService {
request.writeTo(bStream); request.writeTo(bStream);
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());
Runnable runnable = new AbstractRunnable() { final RunOnce runnable = new RunOnce(new AbstractRunnable() {
AtomicBoolean requestSent = new AtomicBoolean();
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
logger.debug("failed to send delayed request", e); logger.debug("failed to send delayed request", e);
@ -359,11 +357,9 @@ public final class MockTransportService extends TransportService {
@Override @Override
protected void doRun() throws IOException { protected void doRun() throws IOException {
if (requestSent.compareAndSet(false, true)) { connection.sendRequest(requestId, action, clonedRequest, options);
connection.sendRequest(requestId, action, clonedRequest, options);
}
} }
}; });
// store the request to send it once the rule is cleared. // store the request to send it once the rule is cleared.
synchronized (this) { synchronized (this) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process.autodetect.output; package org.elasticsearch.xpack.ml.job.process.autodetect.output;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import java.time.Duration; import java.time.Duration;
@ -14,16 +15,21 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
class FlushListener { class FlushListener {
final ConcurrentMap<String, FlushAcknowledgementHolder> awaitingFlushed = new ConcurrentHashMap<>(); final ConcurrentMap<String, FlushAcknowledgementHolder> awaitingFlushed = new ConcurrentHashMap<>();
final AtomicBoolean cleared = new AtomicBoolean(false); final RunOnce onClear = new RunOnce(() -> {
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
while (latches.hasNext()) {
latches.next().getValue().latch.countDown();
latches.remove();
}
});
@Nullable @Nullable
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException { FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
if (cleared.get()) { if (onClear.hasRun()) {
return null; return null;
} }
@ -49,13 +55,7 @@ class FlushListener {
} }
void clear() { void clear() {
if (cleared.compareAndSet(false, true)) { onClear.run();
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
while (latches.hasNext()) {
latches.next().getValue().latch.countDown();
latches.remove();
}
}
} }
private static class FlushAcknowledgementHolder { private static class FlushAcknowledgementHolder {

View File

@ -60,7 +60,7 @@ public class FlushListenerTests extends ESTestCase {
} }
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size())); assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false)); assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false));
assertFalse(listener.cleared.get()); assertFalse(listener.onClear.hasRun());
listener.clear(); listener.clear();
@ -68,6 +68,6 @@ public class FlushListenerTests extends ESTestCase {
assertBusy(() -> assertNotNull(f.get())); assertBusy(() -> assertNotNull(f.get()));
} }
assertTrue(listener.awaitingFlushed.isEmpty()); assertTrue(listener.awaitingFlushed.isEmpty());
assertTrue(listener.cleared.get()); assertTrue(listener.onClear.hasRun());
} }
} }