Simplify some Common ActionRunnable Uses (#47799) (#47828)

Especially in the snapshot code there's a lot
of logic chaining `ActionRunnables` in tricky
ways now and the code is getting hard to follow.
This change introduces two convinience methods that
make it clear that a wrapped listener is invoked with
certainty in some trickier spots and shortens the code a bit.
This commit is contained in:
Armin Braun 2019-10-09 23:29:50 +02:00 committed by GitHub
parent 14b979a7bc
commit 302e09decf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 118 additions and 157 deletions

View File

@ -147,13 +147,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.execute(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
executor.execute(ActionRunnable.run(listener, () -> deleteBlobIgnoringIfNotExists(blobName)));
}
}
try {

View File

@ -20,6 +20,8 @@
package org.elasticsearch.action;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
/**
@ -30,6 +32,32 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
protected final ActionListener<Response> listener;
/**
* Creates a {@link Runnable} that invokes the given listener with {@code null} after the given runnable has executed.
* @param listener Listener to invoke
* @param runnable Runnable to execute
* @return Wrapped {@code Runnable}
*/
public static <T> ActionRunnable<T> run(ActionListener<T> listener, CheckedRunnable<Exception> runnable) {
return new ActionRunnable<T>(listener) {
@Override
protected void doRun() throws Exception {
runnable.run();
listener.onResponse(null);
}
};
}
/**
* Creates a {@link Runnable} that invokes the given listener with the return of the given supplier.
* @param listener Listener to invoke
* @param supplier Supplier that provides value to pass to listener
* @return Wrapped {@code Runnable}
*/
public static <T> ActionRunnable<T> supply(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get()));
}
/**
* Creates a {@link Runnable} that wraps the given listener and a consumer of it that is executed when the {@link Runnable} is run.
* Invokes {@link ActionListener#onFailure(Exception)} on it if an exception is thrown on executing the consumer.

View File

@ -121,8 +121,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(
nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute(
ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots(
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure));
ActionRunnable.supply(listener, () -> buildResponse(request, snapshotsService.currentSnapshots(
request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses))), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
listener.onResponse(buildResponse(request, currentSnapshots, null));

View File

@ -300,6 +300,6 @@ public abstract class TransportBroadcastAction<
private void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
transportService.getThreadPool().executor(shardExecutor)
.execute(ActionRunnable.wrap(listener, l -> l.onResponse(shardOperation(request, task))));
.execute(ActionRunnable.supply(listener, () -> shardOperation(request, task)));
}
}

View File

@ -107,7 +107,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId))
.execute(ActionRunnable.wrap(listener, l -> l.onResponse((shardOperation(request, shardId)))));
.execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
}
protected abstract Writeable.Reader<Response> getResponseReader();

View File

@ -409,10 +409,7 @@ public class RecoverySourceHandler {
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
store.decRef();
l.onResponse(null);
}));
threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
FutureUtils.get(future);
});
}

View File

@ -490,14 +490,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}, listener::onFailure), 2);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
executor.execute(ActionRunnable.wrap(groupedListener, l -> {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs = cleanupStaleRootFiles(staleRootBlobs(newRepoData, rootBlobs.keySet()));
l.onResponse(
new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()));
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
executor.execute(ActionRunnable.wrap(groupedListener, l -> l.onResponse(cleanupStaleIndices(foundIndices, survivingIndexIds))));
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
}
/**
@ -712,26 +711,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// that decrements the generation it points at
// Write Global MetaData
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);
l.onResponse(null);
}));
executor.execute(ActionRunnable.run(allMetaListener,
() -> globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false)));
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
l.onResponse(null);
}));
executor.execute(ActionRunnable.run(allMetaListener, () ->
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false)));
}
executor.execute(ActionRunnable.wrap(allMetaListener, afterMetaListener -> {
executor.execute(ActionRunnable.supply(allMetaListener, () -> {
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
afterMetaListener.onResponse(snapshotInfo);
return snapshotInfo;
}));
}

View File

@ -134,7 +134,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
* to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not
* noticeably slow down searches.
*/
public static final Setting<Boolean> LOW_LEVEL_CANCELLATION_SETTING =
@ -341,7 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
getExecutor(id).execute(ActionRunnable.wrap(listener, l -> l.onResponse(executable.get())));
getExecutor(id).execute(ActionRunnable.supply(listener, executable::get));
}
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
@ -1053,7 +1053,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Executor executor = getExecutor(shard);
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
// now we need to check if there is a pending refresh and register
shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.wrap(listener, l -> l.onResponse(request)))),
shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))),
listener::onFailure);
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not

View File

@ -81,13 +81,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
private void deleteAndAssertEmpty(BlobPath path) throws Exception {
final BlobStoreRepository repo = getRepository();
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repo.threadPool().generic().execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
repo.blobStore().blobContainer(path).delete();
future.onResponse(null);
}
});
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> repo.blobStore().blobContainer(path).delete()));
future.actionGet();
final BlobPath parent = path.parent();
if (parent == null) {
@ -146,19 +140,15 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final Executor genericExec = repo.threadPool().generic();
final int testBlobLen = randomIntBetween(1, 100);
genericExec.execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repo.blobStore();
blobStore.blobContainer(repo.basePath().add("foo"))
.writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
.writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
.writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
future.onResponse(null);
}
});
genericExec.execute(ActionRunnable.run(future, () -> {
final BlobStore blobStore = repo.blobStore();
blobStore.blobContainer(repo.basePath().add("foo"))
.writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
.writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
.writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
}));
future.actionGet();
assertChildren(repo.basePath(), Collections.singleton("foo"));
assertBlobsByPrefix(repo.basePath(), "fo", Collections.emptyMap());
@ -243,37 +233,27 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
private void createDanglingIndex(final BlobStoreRepository repo, final Executor genericExec) throws Exception {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
genericExec.execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repo.blobStore();
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
.writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false);
for (String prefix : Arrays.asList("snap-", "meta-")) {
blobStore.blobContainer(repo.basePath())
.writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
}
future.onResponse(null);
genericExec.execute(ActionRunnable.run(future, () -> {
final BlobStore blobStore = repo.blobStore();
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
.writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false);
for (String prefix : Arrays.asList("snap-", "meta-")) {
blobStore.blobContainer(repo.basePath()).writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
}
});
}));
future.actionGet();
assertTrue(assertCorruptionVisible(repo, genericExec));
}
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception {
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
executor.execute(new ActionRunnable<Boolean>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repo.blobStore();
future.onResponse(
blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat")
);
}
});
executor.execute(ActionRunnable.supply(future, () -> {
final BlobStore blobStore = repo.blobStore();
return blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat")
&& BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat");
}));
return future.actionGet();
}
@ -298,13 +278,8 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
private Set<String> listChildren(BlobPath path) {
final PlainActionFuture<Set<String>> future = PlainActionFuture.newFuture();
final BlobStoreRepository repository = getRepository();
repository.threadPool().generic().execute(new ActionRunnable<Set<String>>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repository.blobStore();
future.onResponse(blobStore.blobContainer(path).children().keySet());
}
});
repository.threadPool().generic().execute(
ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).children().keySet()));
return future.actionGet();
}

View File

@ -89,28 +89,24 @@ public final class BlobStoreTestUtil {
*/
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
executor.execute(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws Exception {
final BlobContainer blobContainer = repository.blobContainer();
final long latestGen;
try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) {
latestGen = inputStream.readLong();
} catch (NoSuchFileException e) {
throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]");
}
assertIndexGenerations(blobContainer, latestGen);
final RepositoryData repositoryData;
try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, blob)) {
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
}
assertIndexUUIDs(blobContainer, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
listener.onResponse(null);
executor.execute(ActionRunnable.run(listener, () -> {
final BlobContainer blobContainer = repository.blobContainer();
final long latestGen;
try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) {
latestGen = inputStream.readLong();
} catch (NoSuchFileException e) {
throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]");
}
});
assertIndexGenerations(blobContainer, latestGen);
final RepositoryData repositoryData;
try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, blob)) {
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
}
assertIndexUUIDs(blobContainer, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
}));
listener.actionGet(TimeValue.timeValueMinutes(1L));
}
@ -186,60 +182,46 @@ public final class BlobStoreTestUtil {
throws InterruptedException, ExecutionException {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final AtomicLong totalSize = new AtomicLong();
repository.threadPool().generic().execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repository.blobStore();
BlobContainer container =
blobStore.blobContainer(repository.basePath().add("indices").add(name));
for (String file : files) {
int size = randomIntBetween(0, 10);
totalSize.addAndGet(size);
container.writeBlob(file, new ByteArrayInputStream(new byte[size]), size, false);
}
future.onResponse(null);
repository.threadPool().generic().execute(ActionRunnable.run(future, () -> {
final BlobStore blobStore = repository.blobStore();
BlobContainer container =
blobStore.blobContainer(repository.basePath().add("indices").add(name));
for (String file : files) {
int size = randomIntBetween(0, 10);
totalSize.addAndGet(size);
container.writeBlob(file, new ByteArrayInputStream(new byte[size]), size, false);
}
});
}));
future.get();
return totalSize.get();
}
public static void assertCorruptionVisible(BlobStoreRepository repository, Map<String, Set<String>> indexToFiles) {
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(new ActionRunnable<Boolean>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repository.blobStore();
for (String index : indexToFiles.keySet()) {
if (blobStore.blobContainer(repository.basePath().add("indices"))
.children().containsKey(index) == false) {
future.onResponse(false);
return;
}
for (String file : indexToFiles.get(index)) {
try (InputStream ignored =
blobStore.blobContainer(repository.basePath().add("indices").add(index)).readBlob(file)) {
} catch (NoSuchFileException e) {
future.onResponse(false);
return;
}
repository.threadPool().generic().execute(ActionRunnable.supply(future, () -> {
final BlobStore blobStore = repository.blobStore();
for (String index : indexToFiles.keySet()) {
if (blobStore.blobContainer(repository.basePath().add("indices"))
.children().containsKey(index) == false) {
return false;
}
for (String file : indexToFiles.get(index)) {
try (InputStream ignored =
blobStore.blobContainer(repository.basePath().add("indices").add(index)).readBlob(file)) {
} catch (NoSuchFileException e) {
return false;
}
}
future.onResponse(true);
}
});
return true;
}));
assertTrue(future.actionGet());
}
public static void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map<String, BlobMetaData> blobs) {
final PlainActionFuture<Map<String, BlobMetaData>> future = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(new ActionRunnable<Map<String, BlobMetaData>>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repository.blobStore();
future.onResponse(blobStore.blobContainer(path).listBlobsByPrefix(prefix));
}
});
repository.threadPool().generic().execute(
ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).listBlobsByPrefix(prefix)));
Map<String, BlobMetaData> foundBlobs = future.actionGet();
if (blobs.isEmpty()) {
assertThat(foundBlobs.keySet(), empty());

View File

@ -83,13 +83,8 @@ class LdapUserSearchSessionFactory extends PoolingSessionFactory {
final String dn = entry.getDN();
final byte[] passwordBytes = CharArrays.toUtf8Bytes(password.getChars());
final SimpleBindRequest bind = new SimpleBindRequest(dn, passwordBytes);
LdapUtils.maybeForkThenBindAndRevert(connectionPool, bind, threadPool, new ActionRunnable<LdapSession>(listener) {
@Override
protected void doRun() throws Exception {
listener.onResponse(new LdapSession(logger, config, connectionPool, dn, groupResolver, metaDataResolver, timeout,
entry.getAttributes()));
}
});
LdapUtils.maybeForkThenBindAndRevert(connectionPool, bind, threadPool, ActionRunnable.supply(listener, () ->
new LdapSession(logger, config, connectionPool, dn, groupResolver, metaDataResolver, timeout, entry.getAttributes())));
}
}, listener::onFailure));
}

View File

@ -49,13 +49,8 @@ public class S3CleanupTests extends ESSingleNodeTestCase {
createRepository("test-repo");
repository = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
repository.blobStore().blobContainer(repository.basePath()).delete();
future.onResponse(null);
}
});
repository.threadPool().generic().execute(ActionRunnable.run(future,
() -> repository.blobStore().blobContainer(repository.basePath()).delete()));
future.actionGet();
assertBusy(() -> BlobStoreTestUtil.assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap()), 10L,
TimeUnit.MINUTES);