diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 5753f17c351..e056e312819 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -103,9 +103,8 @@ public final class CcrLicenseChecker { * @param leaderIndex the name of the leader index * @param onFailure the failure consumer * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards - * @param the type of response the listener is waiting for */ - public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( + public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( final Client client, final String clusterAlias, final String leaderIndex, @@ -118,8 +117,8 @@ public final class CcrLicenseChecker { request.indices(leaderIndex); checkRemoteClusterLicenseAndFetchClusterState( client, - Collections.emptyMap(), clusterAlias, + client.getRemoteClusterClient(clusterAlias), request, onFailure, leaderClusterState -> { @@ -151,22 +150,20 @@ public final class CcrLicenseChecker { * * @param client the client * @param clusterAlias the remote cluster alias - * @param headers the headers to use for leader client * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer */ public void checkRemoteClusterLicenseAndFetchClusterState( final Client client, - final Map headers, final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, final Consumer leaderClusterStateConsumer) { checkRemoteClusterLicenseAndFetchClusterState( client, - headers, clusterAlias, + systemClient(client.getRemoteClusterClient(clusterAlias)), request, onFailure, leaderClusterStateConsumer, @@ -182,18 +179,17 @@ public final class CcrLicenseChecker { * * @param client the client * @param clusterAlias the remote cluster alias - * @param headers the headers to use for leader client + * @param leaderClient the leader client to use to execute cluster state API * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer * @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant * @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure - * @param the type of response the listener is waiting for */ - private void checkRemoteClusterLicenseAndFetchClusterState( + private void checkRemoteClusterLicenseAndFetchClusterState( final Client client, - final Map headers, final String clusterAlias, + final Client leaderClient, final ClusterStateRequest request, final Consumer onFailure, final Consumer leaderClusterStateConsumer, @@ -207,7 +203,6 @@ public final class CcrLicenseChecker { @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { - final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers); final ActionListener clusterStateListener = ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata @@ -361,6 +356,21 @@ public final class CcrLicenseChecker { } } + private static Client systemClient(Client client) { + final ThreadContext threadContext = client.threadPool().getThreadContext(); + return new FilterClient(client) { + @Override + protected + void doExecute(Action action, Request request, ActionListener listener) { + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } + }; + } + private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { final ThreadContext.StoredContext storedContext = threadContext.stashContext(); threadContext.copyHeaders(headers.entrySet()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index b32ed829cf4..6323fb7f103 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -159,8 +159,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { AutoFollower operation = new AutoFollower(handler, followerClusterState) { @Override - void getLeaderClusterState(final Map headers, - final String remoteCluster, + void getLeaderClusterState(final String remoteCluster, final BiConsumer handler) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -168,7 +167,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier { // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, - headers, remoteCluster, request, e -> handler.accept(null, e), @@ -249,7 +247,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { final String remoteCluster = autoFollowPattern.getRemoteCluster(); Map headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName); - getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> { + getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName); @@ -413,13 +411,10 @@ public class AutoFollowCoordinator implements ClusterStateApplier { /** * Fetch the cluster state from the leader with the specified cluster alias - * - * @param headers the client headers * @param remoteCluster the name of the leader cluster * @param handler the callback to invoke */ abstract void getLeaderClusterState( - Map headers, String remoteCluster, BiConsumer handler ); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index 5c1a8324f7c..2562882bc29 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -105,20 +105,12 @@ public final class TransportPutFollowAction client.getRemoteClusterClient(remoteCluster); String leaderIndex = request.getLeaderIndex(); - createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener); - } - - private void createFollowerIndexAndFollowRemoteIndex( - final PutFollowAction.Request request, - final String remoteCluster, - final String leaderIndex, - final ActionListener listener) { ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( - client, - remoteCluster, - leaderIndex, - listener::onFailure, - (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); + client, + remoteCluster, + leaderIndex, + listener::onFailure, + (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); } private void createFollowerIndex( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 6b542d15044..1da58cc2703 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -83,10 +83,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { }; AutoFollower autoFollower = new AutoFollower(handler, currentState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { - assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); + assertThat(remoteCluster, equalTo("remote")); handler.accept(leaderState, null); } @@ -143,8 +142,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { handler.accept(null, failure); } @@ -204,8 +202,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { handler.accept(leaderState, null); } @@ -267,8 +264,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { }; AutoFollower autoFollower = new AutoFollower(handler, followerState) { @Override - void getLeaderClusterState(Map headers, - String remoteCluster, + void getLeaderClusterState(String remoteCluster, BiConsumer handler) { handler.accept(leaderState, null); }