diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 5ac4428c416..e240485fe62 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -157,8 +157,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes .setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId(); final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class); final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); - final RepositoryData repositoryData = - PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f))); + final RepositoryData repositoryData = getRepositoryData(repository); final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); final BytesReference serialized = diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 11993d88738..e0d82b0d707 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -23,7 +23,6 @@ import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -110,7 +109,6 @@ import org.elasticsearch.test.engine.MockEngineSupport; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.StubbableTransport; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; @@ -663,10 +661,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { logger.info("--> request recoveries"); RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); - ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class); Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME); - final RepositoryData repositoryData = PlainActionFuture.get(f -> - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData))); + final RepositoryData repositoryData = PlainActionFuture.get(repository::getRepositoryData); for (Map.Entry> indexRecoveryStates : response.shardRecoveryStates().entrySet()) { assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME)); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 064fe272ff2..97f568c34ac 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -81,7 +81,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA @Override protected String executor() { - return ThreadPool.Names.GENERIC; + return ThreadPool.Names.SAME; } @Inject diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index cd9715a9720..89ccf6824d4 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -349,62 +349,52 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { - threadPool.generic().execute(new AbstractRunnable() { - @Override - protected void doRun() { - final RepositoryMetadata repositoryMetadataStart = metadata; - getRepositoryData(ActionListener.wrap(repositoryData -> { - final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); - clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { + final RepositoryMetadata repositoryMetadataStart = metadata; + getRepositoryData(ActionListener.wrap(repositoryData -> { + final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { - private boolean executedTask = false; + private boolean executedTask = false; - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // Comparing the full metadata here on purpose instead of simply comparing the safe generation. - // If the safe generation has changed, then we have to reload repository data and start over. - // If the pending generation has changed we are in the midst of a write operation and might pick up the - // updated repository data and state on the retry. We don't want to wait for the write to finish though - // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state - // to change in any form. - if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { - executedTask = true; - return updateTask.execute(currentState); - } - return currentState; - } + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Comparing the full metadata here on purpose instead of simply comparing the safe generation. + // If the safe generation has changed, then we have to reload repository data and start over. + // If the pending generation has changed we are in the midst of a write operation and might pick up the + // updated repository data and state on the retry. We don't want to wait for the write to finish though + // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state + // to change in any form. + if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { + executedTask = true; + return updateTask.execute(currentState); + } + return currentState; + } - @Override - public void onFailure(String source, Exception e) { - if (executedTask) { - updateTask.onFailure(source, e); - } else { - onFailure.accept(e); - } - } + @Override + public void onFailure(String source, Exception e) { + if (executedTask) { + updateTask.onFailure(source, e); + } else { + onFailure.accept(e); + } + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (executedTask) { - updateTask.clusterStateProcessed(source, oldState, newState); - } else { - executeConsistentStateUpdate(createUpdateTask, source, onFailure); - } - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (executedTask) { + updateTask.clusterStateProcessed(source, oldState, newState); + } else { + executeConsistentStateUpdate(createUpdateTask, source, onFailure); + } + } - @Override - public TimeValue timeout() { - return updateTask.timeout(); - } - }); - }, onFailure)); - } - - @Override - public void onFailure(Exception e) { - onFailure.accept(e); - } - }); + @Override + public TimeValue timeout() { + return updateTask.timeout(); + } + }); + }, onFailure)); } // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates @@ -580,17 +570,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); } else { - try { - final Map rootBlobs = blobContainer().listBlobs(); - final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); - // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never - // delete an index that was created by another master node after writing this index-N blob. - final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); - doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, - SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); - } catch (Exception ex) { - listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, ex)); - } + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + final Map rootBlobs = blobContainer().listBlobs(); + final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs); + // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never + // delete an index that was created by another master node after writing this index-N blob. + final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); + doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, + SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, e)); + } + }); } } @@ -1197,6 +1193,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp listener.onFailure(corruptedStateException(null)); return; } + final Tuple cached = latestKnownRepositoryData.get(); + // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with + // the latest known repository generation + if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) { + try { + listener.onResponse(repositoryDataFromCachedEntry(cached)); + } catch (Exception e) { + listener.onFailure(e); + } + return; + } + // Slow path if we were not able to safely read the repository data from cache + threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + } + + private void doGetRepositoryData(ActionListener listener) { // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. // Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same // generation repeatedly. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index acf244db5d2..34e942d706a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; @@ -1546,17 +1545,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus */ private void deleteSnapshotsFromRepository(String repoName, Collection snapshotIds, @Nullable ActionListener listener, long repositoryStateId, Version minNodeVersion) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { - Repository repository = repositoriesService.repository(repoName); - repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds, + Repository repository = repositoriesService.repository(repoName); + repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots( + snapshotIds, repositoryStateId, minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds), ActionListener.wrap(v -> { - logger.info("snapshots {} deleted", snapshotIds); - removeSnapshotDeletionFromClusterState(snapshotIds, null, l); - }, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l) - )), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l))); - })); + logger.info("snapshots {} deleted", snapshotIds); + removeSnapshotDeletionFromClusterState(snapshotIds, null, listener); + }, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener) + )), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener))); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 781edb53eb5..d36c69d0c6b 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -133,16 +133,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { } protected RepositoryData getRepositoryData(String repository) { - return getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repository)); + return getRepositoryData(internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repository)); } protected RepositoryData getRepositoryData(Repository repository) { - ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName()); - final PlainActionFuture repositoryData = PlainActionFuture.newFuture(); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - repository.getRepositoryData(repositoryData); - }); - return repositoryData.actionGet(); + return PlainActionFuture.get(repository::getRepositoryData); } public static long getFailureCount(String repository) {