[CCR] Rename leaderClient variables and parameters to remoteClient (#35368)
This commit is contained in:
parent
54b445d74b
commit
07a69a528b
|
@ -128,10 +128,10 @@ public final class CcrLicenseChecker {
|
|||
return;
|
||||
}
|
||||
|
||||
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
|
||||
hasPrivilegesToFollowIndices(leaderClient, new String[] {leaderIndex}, e -> {
|
||||
final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
|
||||
hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> {
|
||||
if (e == null) {
|
||||
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs ->
|
||||
fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, onFailure, historyUUIDs ->
|
||||
consumer.accept(historyUUIDs, leaderIndexMetaData));
|
||||
} else {
|
||||
onFailure.accept(e);
|
||||
|
@ -179,7 +179,7 @@ public final class CcrLicenseChecker {
|
|||
*
|
||||
* @param client the client
|
||||
* @param clusterAlias the remote cluster alias
|
||||
* @param leaderClient the leader client to use to execute cluster state API
|
||||
* @param remoteClient the remote client to use to execute cluster state API
|
||||
* @param request the cluster state request
|
||||
* @param onFailure the failure consumer
|
||||
* @param leaderClusterStateConsumer the leader cluster state consumer
|
||||
|
@ -189,7 +189,7 @@ public final class CcrLicenseChecker {
|
|||
private void checkRemoteClusterLicenseAndFetchClusterState(
|
||||
final Client client,
|
||||
final String clusterAlias,
|
||||
final Client leaderClient,
|
||||
final Client remoteClient,
|
||||
final ClusterStateRequest request,
|
||||
final Consumer<Exception> onFailure,
|
||||
final Consumer<ClusterState> leaderClusterStateConsumer,
|
||||
|
@ -206,7 +206,7 @@ public final class CcrLicenseChecker {
|
|||
final ActionListener<ClusterStateResponse> clusterStateListener =
|
||||
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
|
||||
// following an index in remote cluster, so use remote client to fetch leader index metadata
|
||||
leaderClient.admin().cluster().state(request, clusterStateListener);
|
||||
remoteClient.admin().cluster().state(request, clusterStateListener);
|
||||
} else {
|
||||
onFailure.accept(nonCompliantLicense.apply(licenseCheck));
|
||||
}
|
||||
|
@ -221,9 +221,9 @@ public final class CcrLicenseChecker {
|
|||
}
|
||||
|
||||
/**
|
||||
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient.
|
||||
* Fetches the history UUIDs for leader index on per shard basis using the specified remoteClient.
|
||||
*
|
||||
* @param leaderClient the leader client
|
||||
* @param remoteClient the remote client
|
||||
* @param leaderIndexMetaData the leader index metadata
|
||||
* @param onFailure the failure consumer
|
||||
* @param historyUUIDConsumer the leader index history uuid and consumer
|
||||
|
@ -231,7 +231,7 @@ public final class CcrLicenseChecker {
|
|||
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
|
||||
// in case of following a local or a remote cluster.
|
||||
public void fetchLeaderHistoryUUIDs(
|
||||
final Client leaderClient,
|
||||
final Client remoteClient,
|
||||
final IndexMetaData leaderIndexMetaData,
|
||||
final Consumer<Exception> onFailure,
|
||||
final Consumer<String[]> historyUUIDConsumer) {
|
||||
|
@ -274,7 +274,7 @@ public final class CcrLicenseChecker {
|
|||
IndicesStatsRequest request = new IndicesStatsRequest();
|
||||
request.clear();
|
||||
request.indices(leaderIndex);
|
||||
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
|
||||
remoteClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -282,12 +282,12 @@ public final class CcrLicenseChecker {
|
|||
* client. The specified callback will be invoked with null if the user has the necessary privileges to follow the specified indices,
|
||||
* otherwise the callback will be invoked with an exception outlining the authorization error.
|
||||
*
|
||||
* @param leaderClient the leader client
|
||||
* @param remoteClient the remote client
|
||||
* @param indices the indices
|
||||
* @param handler the callback
|
||||
*/
|
||||
public void hasPrivilegesToFollowIndices(final Client leaderClient, final String[] indices, final Consumer<Exception> handler) {
|
||||
Objects.requireNonNull(leaderClient, "leaderClient");
|
||||
public void hasPrivilegesToFollowIndices(final Client remoteClient, final String[] indices, final Consumer<Exception> handler) {
|
||||
Objects.requireNonNull(remoteClient, "remoteClient");
|
||||
Objects.requireNonNull(indices, "indices");
|
||||
if (indices.length == 0) {
|
||||
throw new IllegalArgumentException("indices must not be empty");
|
||||
|
@ -298,7 +298,7 @@ public final class CcrLicenseChecker {
|
|||
return;
|
||||
}
|
||||
|
||||
ThreadContext threadContext = leaderClient.threadPool().getThreadContext();
|
||||
ThreadContext threadContext = remoteClient.threadPool().getThreadContext();
|
||||
SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
|
||||
String username = securityContext.getUser().principal();
|
||||
|
||||
|
@ -332,7 +332,7 @@ public final class CcrLicenseChecker {
|
|||
handler.accept(Exceptions.authorizationError(message.toString()));
|
||||
}
|
||||
};
|
||||
leaderClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
|
||||
remoteClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
|
||||
}
|
||||
|
||||
public static Client wrapClient(Client client, Map<String, String> headers) {
|
||||
|
|
|
@ -91,11 +91,11 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
|
||||
Map<String, String> headers) {
|
||||
ShardFollowTask params = taskInProgress.getParams();
|
||||
final Client leaderClient;
|
||||
final Client remoteClient;
|
||||
if (params.getRemoteCluster() != null) {
|
||||
leaderClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
|
||||
remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
|
||||
} else {
|
||||
leaderClient = wrapClient(client, params.getHeaders());
|
||||
remoteClient = wrapClient(client, params.getHeaders());
|
||||
}
|
||||
Client followerClient = wrapClient(client, params.getHeaders());
|
||||
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
|
||||
|
@ -124,7 +124,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
clusterStateRequest.metaData(true);
|
||||
clusterStateRequest.indices(leaderIndex.getName());
|
||||
|
||||
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
|
||||
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
|
||||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
|
||||
if (indexMetaData.getMappings().isEmpty()) {
|
||||
assert indexMetaData.getMappingVersion() == 1;
|
||||
|
@ -186,7 +186,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
}
|
||||
}
|
||||
};
|
||||
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
|
||||
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
|
||||
}
|
||||
|
||||
private void closeIndexUpdateSettingsAndOpenIndex(String followIndex,
|
||||
|
@ -240,7 +240,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
|
|||
request.setMaxOperationCount(maxOperationCount);
|
||||
request.setMaxBatchSize(params.getMaxReadRequestSize());
|
||||
request.setPollTimeout(params.getReadPollTimeout());
|
||||
leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
|
||||
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -74,7 +74,7 @@ public class TransportPutAutoFollowPatternAction extends
|
|||
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
|
||||
return;
|
||||
}
|
||||
final Client leaderClient = client.getRemoteClusterClient(request.getRemoteCluster());
|
||||
final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster());
|
||||
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
|
||||
clusterStateRequest.clear();
|
||||
clusterStateRequest.metaData(true);
|
||||
|
@ -84,9 +84,9 @@ public class TransportPutAutoFollowPatternAction extends
|
|||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
|
||||
ccrLicenseChecker.hasPrivilegesToFollowIndices(leaderClient, indices, e -> {
|
||||
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
|
||||
if (e == null) {
|
||||
leaderClient.admin().cluster().state(
|
||||
remoteClient.admin().cluster().state(
|
||||
clusterStateRequest,
|
||||
ActionListener.wrap(
|
||||
clusterStateResponse -> {
|
||||
|
|
Loading…
Reference in New Issue