We don't need to switch to the generic or snapshot pool for loading cached repository data (i.e. most of the time in normal operation). This makes `executeConsistentStateUpdate` less heavy if it has to retry and lowers the chance of having to retry in the first place. Also, this change allowed simplifying a few other spots in the codebase where we would fork off to another pool just to load repository data.
This commit is contained in:
parent
665b7b7bd8
commit
64c5f70a2d
|
@ -157,8 +157,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
|||
.setWaitForCompletion(true).setIndices().get().getSnapshotInfo().snapshotId();
|
||||
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
|
||||
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
|
||||
final RepositoryData repositoryData =
|
||||
PlainActionFuture.get(f -> repository.threadPool().generic().execute(() -> repository.getRepositoryData(f)));
|
||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
||||
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
|
||||
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
|
||||
final BytesReference serialized =
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.lucene.analysis.TokenStream;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
|
@ -110,7 +109,6 @@ import org.elasticsearch.test.engine.MockEngineSupport;
|
|||
import org.elasticsearch.test.store.MockFSIndexStore;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.test.transport.StubbableTransport;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -663,10 +661,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
logger.info("--> request recoveries");
|
||||
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
|
||||
|
||||
ThreadPool threadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
|
||||
Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(REPO_NAME);
|
||||
final RepositoryData repositoryData = PlainActionFuture.get(f ->
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(f, repository::getRepositoryData)));
|
||||
final RepositoryData repositoryData = PlainActionFuture.get(repository::getRepositoryData);
|
||||
for (Map.Entry<String, List<RecoveryState>> indexRecoveryStates : response.shardRecoveryStates().entrySet()) {
|
||||
|
||||
assertThat(indexRecoveryStates.getKey(), equalTo(INDEX_NAME));
|
||||
|
|
|
@ -81,7 +81,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
|
|||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Inject
|
||||
|
|
|
@ -349,62 +349,52 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
@Override
|
||||
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
|
||||
Consumer<Exception> onFailure) {
|
||||
threadPool.generic().execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
final RepositoryMetadata repositoryMetadataStart = metadata;
|
||||
getRepositoryData(ActionListener.wrap(repositoryData -> {
|
||||
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
|
||||
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
|
||||
final RepositoryMetadata repositoryMetadataStart = metadata;
|
||||
getRepositoryData(ActionListener.wrap(repositoryData -> {
|
||||
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
|
||||
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
|
||||
|
||||
private boolean executedTask = false;
|
||||
private boolean executedTask = false;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
|
||||
// If the safe generation has changed, then we have to reload repository data and start over.
|
||||
// If the pending generation has changed we are in the midst of a write operation and might pick up the
|
||||
// updated repository data and state on the retry. We don't want to wait for the write to finish though
|
||||
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
|
||||
// to change in any form.
|
||||
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
|
||||
executedTask = true;
|
||||
return updateTask.execute(currentState);
|
||||
}
|
||||
return currentState;
|
||||
}
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
|
||||
// If the safe generation has changed, then we have to reload repository data and start over.
|
||||
// If the pending generation has changed we are in the midst of a write operation and might pick up the
|
||||
// updated repository data and state on the retry. We don't want to wait for the write to finish though
|
||||
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
|
||||
// to change in any form.
|
||||
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
|
||||
executedTask = true;
|
||||
return updateTask.execute(currentState);
|
||||
}
|
||||
return currentState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
if (executedTask) {
|
||||
updateTask.onFailure(source, e);
|
||||
} else {
|
||||
onFailure.accept(e);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
if (executedTask) {
|
||||
updateTask.onFailure(source, e);
|
||||
} else {
|
||||
onFailure.accept(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (executedTask) {
|
||||
updateTask.clusterStateProcessed(source, oldState, newState);
|
||||
} else {
|
||||
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (executedTask) {
|
||||
updateTask.clusterStateProcessed(source, oldState, newState);
|
||||
} else {
|
||||
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return updateTask.timeout();
|
||||
}
|
||||
});
|
||||
}, onFailure));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
onFailure.accept(e);
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return updateTask.timeout();
|
||||
}
|
||||
});
|
||||
}, onFailure));
|
||||
}
|
||||
|
||||
// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
|
||||
|
@ -580,17 +570,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||
} else {
|
||||
try {
|
||||
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
||||
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, ex));
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
||||
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, e));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1197,6 +1193,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
listener.onFailure(corruptedStateException(null));
|
||||
return;
|
||||
}
|
||||
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
|
||||
// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
|
||||
// the latest known repository generation
|
||||
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
|
||||
try {
|
||||
listener.onResponse(repositoryDataFromCachedEntry(cached));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// Slow path if we were not able to safely read the repository data from cache
|
||||
threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
|
||||
}
|
||||
|
||||
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
|
||||
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
|
||||
// generation repeatedly.
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
|
||||
|
@ -1546,17 +1545,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
*/
|
||||
private void deleteSnapshotsFromRepository(String repoName, Collection<SnapshotId> snapshotIds, @Nullable ActionListener<Void> listener,
|
||||
long repositoryStateId, Version minNodeVersion) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
|
||||
Repository repository = repositoriesService.repository(repoName);
|
||||
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds,
|
||||
Repository repository = repositoriesService.repository(repoName);
|
||||
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(
|
||||
snapshotIds,
|
||||
repositoryStateId,
|
||||
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
|
||||
ActionListener.wrap(v -> {
|
||||
logger.info("snapshots {} deleted", snapshotIds);
|
||||
removeSnapshotDeletionFromClusterState(snapshotIds, null, l);
|
||||
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)
|
||||
)), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)));
|
||||
}));
|
||||
logger.info("snapshots {} deleted", snapshotIds);
|
||||
removeSnapshotDeletionFromClusterState(snapshotIds, null, listener);
|
||||
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener)
|
||||
)), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, listener)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -133,16 +133,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
protected RepositoryData getRepositoryData(String repository) {
|
||||
return getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repository));
|
||||
return getRepositoryData(internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repository));
|
||||
}
|
||||
|
||||
protected RepositoryData getRepositoryData(Repository repository) {
|
||||
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
|
||||
final PlainActionFuture<RepositoryData> repositoryData = PlainActionFuture.newFuture();
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
|
||||
repository.getRepositoryData(repositoryData);
|
||||
});
|
||||
return repositoryData.actionGet();
|
||||
return PlainActionFuture.get(repository::getRepositoryData);
|
||||
}
|
||||
|
||||
public static long getFailureCount(String repository) {
|
||||
|
|
Loading…
Reference in New Issue