From 302e09decfb4537c98eeef42cdde0de9fe7834af Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 9 Oct 2019 23:29:50 +0200 Subject: [PATCH] Simplify some Common ActionRunnable Uses (#47799) (#47828) Especially in the snapshot code there's a lot of logic chaining `ActionRunnables` in tricky ways now and the code is getting hard to follow. This change introduces two convinience methods that make it clear that a wrapped listener is invoked with certainty in some trickier spots and shortens the code a bit. --- .../azure/AzureBlobContainer.java | 8 +- .../elasticsearch/action/ActionRunnable.java | 28 +++++ .../TransportSnapshotsStatusAction.java | 4 +- .../broadcast/TransportBroadcastAction.java | 2 +- .../shard/TransportSingleShardAction.java | 2 +- .../recovery/RecoverySourceHandler.java | 5 +- .../blobstore/BlobStoreRepository.java | 23 ++-- .../elasticsearch/search/SearchService.java | 6 +- .../AbstractThirdPartyRepositoryTestCase.java | 77 +++++-------- .../blobstore/BlobStoreTestUtil.java | 102 ++++++++---------- .../ldap/LdapUserSearchSessionFactory.java | 9 +- .../snapshots/S3CleanupTests.java | 9 +- 12 files changed, 118 insertions(+), 157 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 37963648a74..aaf7dc6391b 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -147,13 +147,7 @@ public class AzureBlobContainer extends AbstractBlobContainer { // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. for (String blobName : blobNames) { - executor.execute(new ActionRunnable(listener) { - @Override - protected void doRun() throws IOException { - deleteBlobIgnoringIfNotExists(blobName); - listener.onResponse(null); - } - }); + executor.execute(ActionRunnable.run(listener, () -> deleteBlobIgnoringIfNotExists(blobName))); } } try { diff --git a/server/src/main/java/org/elasticsearch/action/ActionRunnable.java b/server/src/main/java/org/elasticsearch/action/ActionRunnable.java index 86be481f27f..f9ab268fab8 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionRunnable.java +++ b/server/src/main/java/org/elasticsearch/action/ActionRunnable.java @@ -20,6 +20,8 @@ package org.elasticsearch.action; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.util.concurrent.AbstractRunnable; /** @@ -30,6 +32,32 @@ public abstract class ActionRunnable extends AbstractRunnable { protected final ActionListener listener; + /** + * Creates a {@link Runnable} that invokes the given listener with {@code null} after the given runnable has executed. + * @param listener Listener to invoke + * @param runnable Runnable to execute + * @return Wrapped {@code Runnable} + */ + public static ActionRunnable run(ActionListener listener, CheckedRunnable runnable) { + return new ActionRunnable(listener) { + @Override + protected void doRun() throws Exception { + runnable.run(); + listener.onResponse(null); + } + }; + } + + /** + * Creates a {@link Runnable} that invokes the given listener with the return of the given supplier. + * @param listener Listener to invoke + * @param supplier Supplier that provides value to pass to listener + * @return Wrapped {@code Runnable} + */ + public static ActionRunnable supply(ActionListener listener, CheckedSupplier supplier) { + return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get())); + } + /** * Creates a {@link Runnable} that wraps the given listener and a consumer of it that is executed when the {@link Runnable} is run. * Invokes {@link ActionListener#onFailure(Exception)} on it if an exception is thrown on executing the consumer. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 1d0c3ed4d8c..7644c9d82c4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -121,8 +121,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction threadPool.executor(ThreadPool.Names.GENERIC).execute( - ActionRunnable.wrap(listener, l -> l.onResponse(buildResponse(request, snapshotsService.currentSnapshots( - request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses)))), listener::onFailure)); + ActionRunnable.supply(listener, () -> buildResponse(request, snapshotsService.currentSnapshots( + request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses))), listener::onFailure)); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index c9ca9aa4c67..6725c92c808 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -300,6 +300,6 @@ public abstract class TransportBroadcastAction< private void asyncShardOperation(ShardRequest request, Task task, ActionListener listener) { transportService.getThreadPool().executor(shardExecutor) - .execute(ActionRunnable.wrap(listener, l -> l.onResponse(shardOperation(request, task)))); + .execute(ActionRunnable.supply(listener, () -> shardOperation(request, task))); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 8fae3ebab54..cc75c3863e2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -107,7 +107,7 @@ public abstract class TransportSingleShardAction listener) throws IOException { threadPool.executor(getExecutor(request, shardId)) - .execute(ActionRunnable.wrap(listener, l -> l.onResponse((shardOperation(request, shardId))))); + .execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId))); } protected abstract Writeable.Reader getResponseReader(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 0c976cabac9..439565ced36 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -409,10 +409,7 @@ public class RecoverySourceHandler { // TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool. // While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures // below and thus make it impossible for the store release to execute which in turn would block the futures forever - threadPool.generic().execute(ActionRunnable.wrap(future, l -> { - store.decRef(); - l.onResponse(null); - })); + threadPool.generic().execute(ActionRunnable.run(future, store::decRef)); FutureUtils.get(future); }); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4f83bdc0192..b81e7f1ea33 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -490,14 +490,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp }, listener::onFailure), 2); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - executor.execute(ActionRunnable.wrap(groupedListener, l -> { + executor.execute(ActionRunnable.supply(groupedListener, () -> { List deletedBlobs = cleanupStaleRootFiles(staleRootBlobs(newRepoData, rootBlobs.keySet())); - l.onResponse( - new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum())); + return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()); })); final Set survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); - executor.execute(ActionRunnable.wrap(groupedListener, l -> l.onResponse(cleanupStaleIndices(foundIndices, survivingIndexIds)))); + executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds))); } /** @@ -712,26 +711,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // that decrements the generation it points at // Write Global MetaData - executor.execute(ActionRunnable.wrap(allMetaListener, l -> { - globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false); - l.onResponse(null); - })); + executor.execute(ActionRunnable.run(allMetaListener, + () -> globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false))); // write the index metadata for each index in the snapshot for (IndexId index : indices) { - executor.execute(ActionRunnable.wrap(allMetaListener, l -> { - indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); - l.onResponse(null); - })); + executor.execute(ActionRunnable.run(allMetaListener, () -> + indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false))); } - executor.execute(ActionRunnable.wrap(allMetaListener, afterMetaListener -> { + executor.execute(ActionRunnable.supply(allMetaListener, () -> { final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, includeGlobalState, userMetadata); snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); - afterMetaListener.onResponse(snapshotInfo); + return snapshotInfo; })); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f3daa34eb4f..0b352028a75 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -134,7 +134,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv /** * Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react - * to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not + * to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not * noticeably slow down searches. */ public static final Setting LOW_LEVEL_CANCELLATION_SETTING = @@ -341,7 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } private void runAsync(long id, Supplier executable, ActionListener listener) { - getExecutor(id).execute(ActionRunnable.wrap(listener, l -> l.onResponse(executable.get()))); + getExecutor(id).execute(ActionRunnable.supply(listener, executable::get)); } private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { @@ -1053,7 +1053,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Executor executor = getExecutor(shard); ActionListener actionListener = ActionListener.wrap(r -> // now we need to check if there is a pending refresh and register - shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.wrap(listener, l -> l.onResponse(request)))), + shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.supply(listener, () -> request))), listener::onFailure); // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index afb8ac9454f..9b5ef595ce4 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -81,13 +81,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT private void deleteAndAssertEmpty(BlobPath path) throws Exception { final BlobStoreRepository repo = getRepository(); final PlainActionFuture future = PlainActionFuture.newFuture(); - repo.threadPool().generic().execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - repo.blobStore().blobContainer(path).delete(); - future.onResponse(null); - } - }); + repo.threadPool().generic().execute(ActionRunnable.run(future, () -> repo.blobStore().blobContainer(path).delete())); future.actionGet(); final BlobPath parent = path.parent(); if (parent == null) { @@ -146,19 +140,15 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT final PlainActionFuture future = PlainActionFuture.newFuture(); final Executor genericExec = repo.threadPool().generic(); final int testBlobLen = randomIntBetween(1, 100); - genericExec.execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repo.blobStore(); - blobStore.blobContainer(repo.basePath().add("foo")) - .writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); - blobStore.blobContainer(repo.basePath().add("foo").add("nested")) - .writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); - blobStore.blobContainer(repo.basePath().add("foo").add("nested2")) - .writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); - future.onResponse(null); - } - }); + genericExec.execute(ActionRunnable.run(future, () -> { + final BlobStore blobStore = repo.blobStore(); + blobStore.blobContainer(repo.basePath().add("foo")) + .writeBlob("nested-blob", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); + blobStore.blobContainer(repo.basePath().add("foo").add("nested")) + .writeBlob("bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); + blobStore.blobContainer(repo.basePath().add("foo").add("nested2")) + .writeBlob("blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false); + })); future.actionGet(); assertChildren(repo.basePath(), Collections.singleton("foo")); assertBlobsByPrefix(repo.basePath(), "fo", Collections.emptyMap()); @@ -243,37 +233,27 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT private void createDanglingIndex(final BlobStoreRepository repo, final Executor genericExec) throws Exception { final PlainActionFuture future = PlainActionFuture.newFuture(); - genericExec.execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repo.blobStore(); - blobStore.blobContainer(repo.basePath().add("indices").add("foo")) - .writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false); - for (String prefix : Arrays.asList("snap-", "meta-")) { - blobStore.blobContainer(repo.basePath()) - .writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false); - } - future.onResponse(null); + genericExec.execute(ActionRunnable.run(future, () -> { + final BlobStore blobStore = repo.blobStore(); + blobStore.blobContainer(repo.basePath().add("indices").add("foo")) + .writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false); + for (String prefix : Arrays.asList("snap-", "meta-")) { + blobStore.blobContainer(repo.basePath()).writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false); } - }); + })); future.actionGet(); assertTrue(assertCorruptionVisible(repo, genericExec)); } protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception { final PlainActionFuture future = PlainActionFuture.newFuture(); - executor.execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repo.blobStore(); - future.onResponse( - blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo") - && BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar") - && BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat") - && BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat") - ); - } - }); + executor.execute(ActionRunnable.supply(future, () -> { + final BlobStore blobStore = repo.blobStore(); + return blobStore.blobContainer(repo.basePath().add("indices")).children().containsKey("foo") + && BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath().add("indices").add("foo")), "bar") + && BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "meta-foo.dat") + && BlobStoreTestUtil.blobExists(blobStore.blobContainer(repo.basePath()), "snap-foo.dat"); + })); return future.actionGet(); } @@ -298,13 +278,8 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT private Set listChildren(BlobPath path) { final PlainActionFuture> future = PlainActionFuture.newFuture(); final BlobStoreRepository repository = getRepository(); - repository.threadPool().generic().execute(new ActionRunnable>(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repository.blobStore(); - future.onResponse(blobStore.blobContainer(path).children().keySet()); - } - }); + repository.threadPool().generic().execute( + ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).children().keySet())); return future.actionGet(); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index bbfaa0d9228..0438d940bbd 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -89,28 +89,24 @@ public final class BlobStoreTestUtil { */ public static void assertConsistency(BlobStoreRepository repository, Executor executor) { final PlainActionFuture listener = PlainActionFuture.newFuture(); - executor.execute(new ActionRunnable(listener) { - @Override - protected void doRun() throws Exception { - final BlobContainer blobContainer = repository.blobContainer(); - final long latestGen; - try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) { - latestGen = inputStream.readLong(); - } catch (NoSuchFileException e) { - throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]"); - } - assertIndexGenerations(blobContainer, latestGen); - final RepositoryData repositoryData; - try (InputStream blob = blobContainer.readBlob("index-" + latestGen); - XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, blob)) { - repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); - } - assertIndexUUIDs(blobContainer, repositoryData); - assertSnapshotUUIDs(repository, repositoryData); - listener.onResponse(null); + executor.execute(ActionRunnable.run(listener, () -> { + final BlobContainer blobContainer = repository.blobContainer(); + final long latestGen; + try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) { + latestGen = inputStream.readLong(); + } catch (NoSuchFileException e) { + throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]"); } - }); + assertIndexGenerations(blobContainer, latestGen); + final RepositoryData repositoryData; + try (InputStream blob = blobContainer.readBlob("index-" + latestGen); + XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, blob)) { + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); + } + assertIndexUUIDs(blobContainer, repositoryData); + assertSnapshotUUIDs(repository, repositoryData); + })); listener.actionGet(TimeValue.timeValueMinutes(1L)); } @@ -186,60 +182,46 @@ public final class BlobStoreTestUtil { throws InterruptedException, ExecutionException { final PlainActionFuture future = PlainActionFuture.newFuture(); final AtomicLong totalSize = new AtomicLong(); - repository.threadPool().generic().execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repository.blobStore(); - BlobContainer container = - blobStore.blobContainer(repository.basePath().add("indices").add(name)); - for (String file : files) { - int size = randomIntBetween(0, 10); - totalSize.addAndGet(size); - container.writeBlob(file, new ByteArrayInputStream(new byte[size]), size, false); - } - future.onResponse(null); + repository.threadPool().generic().execute(ActionRunnable.run(future, () -> { + final BlobStore blobStore = repository.blobStore(); + BlobContainer container = + blobStore.blobContainer(repository.basePath().add("indices").add(name)); + for (String file : files) { + int size = randomIntBetween(0, 10); + totalSize.addAndGet(size); + container.writeBlob(file, new ByteArrayInputStream(new byte[size]), size, false); } - }); + })); future.get(); return totalSize.get(); } public static void assertCorruptionVisible(BlobStoreRepository repository, Map> indexToFiles) { final PlainActionFuture future = PlainActionFuture.newFuture(); - repository.threadPool().generic().execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repository.blobStore(); - for (String index : indexToFiles.keySet()) { - if (blobStore.blobContainer(repository.basePath().add("indices")) - .children().containsKey(index) == false) { - future.onResponse(false); - return; - } - for (String file : indexToFiles.get(index)) { - try (InputStream ignored = - blobStore.blobContainer(repository.basePath().add("indices").add(index)).readBlob(file)) { - } catch (NoSuchFileException e) { - future.onResponse(false); - return; - } + repository.threadPool().generic().execute(ActionRunnable.supply(future, () -> { + final BlobStore blobStore = repository.blobStore(); + for (String index : indexToFiles.keySet()) { + if (blobStore.blobContainer(repository.basePath().add("indices")) + .children().containsKey(index) == false) { + return false; + } + for (String file : indexToFiles.get(index)) { + try (InputStream ignored = + blobStore.blobContainer(repository.basePath().add("indices").add(index)).readBlob(file)) { + } catch (NoSuchFileException e) { + return false; } } - future.onResponse(true); } - }); + return true; + })); assertTrue(future.actionGet()); } public static void assertBlobsByPrefix(BlobStoreRepository repository, BlobPath path, String prefix, Map blobs) { final PlainActionFuture> future = PlainActionFuture.newFuture(); - repository.threadPool().generic().execute(new ActionRunnable>(future) { - @Override - protected void doRun() throws Exception { - final BlobStore blobStore = repository.blobStore(); - future.onResponse(blobStore.blobContainer(path).listBlobsByPrefix(prefix)); - } - }); + repository.threadPool().generic().execute( + ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).listBlobsByPrefix(prefix))); Map foundBlobs = future.actionGet(); if (blobs.isEmpty()) { assertThat(foundBlobs.keySet(), empty()); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapUserSearchSessionFactory.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapUserSearchSessionFactory.java index 64f3be516fa..24cbb9418f8 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapUserSearchSessionFactory.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapUserSearchSessionFactory.java @@ -83,13 +83,8 @@ class LdapUserSearchSessionFactory extends PoolingSessionFactory { final String dn = entry.getDN(); final byte[] passwordBytes = CharArrays.toUtf8Bytes(password.getChars()); final SimpleBindRequest bind = new SimpleBindRequest(dn, passwordBytes); - LdapUtils.maybeForkThenBindAndRevert(connectionPool, bind, threadPool, new ActionRunnable(listener) { - @Override - protected void doRun() throws Exception { - listener.onResponse(new LdapSession(logger, config, connectionPool, dn, groupResolver, metaDataResolver, timeout, - entry.getAttributes())); - } - }); + LdapUtils.maybeForkThenBindAndRevert(connectionPool, bind, threadPool, ActionRunnable.supply(listener, () -> + new LdapSession(logger, config, connectionPool, dn, groupResolver, metaDataResolver, timeout, entry.getAttributes()))); } }, listener::onFailure)); } diff --git a/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java b/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java index 5a80103f0b5..c33a5993248 100644 --- a/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java +++ b/x-pack/snapshot-tool/src/test/java/org/elasticsearch/snapshots/S3CleanupTests.java @@ -49,13 +49,8 @@ public class S3CleanupTests extends ESSingleNodeTestCase { createRepository("test-repo"); repository = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); final PlainActionFuture future = PlainActionFuture.newFuture(); - repository.threadPool().generic().execute(new ActionRunnable(future) { - @Override - protected void doRun() throws Exception { - repository.blobStore().blobContainer(repository.basePath()).delete(); - future.onResponse(null); - } - }); + repository.threadPool().generic().execute(ActionRunnable.run(future, + () -> repository.blobStore().blobContainer(repository.basePath()).delete())); future.actionGet(); assertBusy(() -> BlobStoreTestUtil.assertBlobsByPrefix(repository, repository.basePath(), "", Collections.emptyMap()), 10L, TimeUnit.MINUTES);