[CCR] Make auto follow patterns work with security (#33501)

Relates to #33007
This commit is contained in:
Martijn van Groningen 2018-09-17 07:29:00 +02:00 committed by GitHub
parent 3046656ab1
commit 481f8a9a07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 252 additions and 104 deletions

View File

@ -2,7 +2,7 @@ ccruser:
cluster:
- manage_ccr
indices:
- names: [ 'allowed-index' ]
- names: [ 'allowed-index', 'logs-eu-*' ]
privileges:
- monitor
- read

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr;
import org.apache.http.HttpHost;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
@ -119,6 +120,45 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
}
}
public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
String allowedIndex = "logs-eu-20190101";
String disallowedIndex = "logs-us-20190101";
Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
assertOK(client().performRequest(request));
try (RestClient leaderClient = buildLeaderClient()) {
for (String index : new String[]{allowedIndex, disallowedIndex}) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
String requestBody = "{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }";
request = new Request("PUT", "/" + index);
request.setJsonEntity(requestBody);
assertOK(leaderClient.performRequest(request));
for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, index, id, "field", i, "filtered_field", "true");
}
}
}
assertBusy(() -> {
ensureYellow(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, 5);
});
assertThat(indexExists(adminClient(), disallowedIndex), is(false));
// Cleanup by deleting auto follow pattern and unfollowing:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
assertOK(client().performRequest(request));
unfollowIndex(allowedIndex);
}
private int countCcrNodeTasks() throws IOException {
final Request request = new Request("GET", "/_tasks");
request.addParameter("detailed", "true");
@ -139,6 +179,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
}
private static void index(String index, String id, Object... fields) throws IOException {
index(adminClient(), index, id, fields);
}
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
@ -146,7 +190,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
document.endObject();
final Request request = new Request("POST", "/" + index + "/_doc/" + id);
request.setJsonEntity(Strings.toString(document));
assertOK(adminClient().performRequest(request));
assertOK(client.performRequest(request));
}
private static void refresh(String index) throws IOException {
@ -201,11 +245,34 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertOK(adminClient().performRequest(request));
}
private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}
private RestClient buildLeaderClient() throws IOException {
assert runningAgainstLeaderCluster == false;
String leaderUrl = System.getProperty("tests.leader_host");
int portSeparator = leaderUrl.lastIndexOf(':');
HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator),
Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol());
return buildClient(restAdminSettings(), new HttpHost[]{httpHost});
}
private static boolean indexExists(RestClient client, String index) throws IOException {
Response response = client.performRequest(new Request("HEAD", "/" + index));
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}
private static void unfollowIndex(String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/unfollow")));
}
private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*");
@ -239,14 +306,4 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1));
}
private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
}
}

View File

@ -7,17 +7,23 @@
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
@ -25,15 +31,19 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Encapsulates licensing checking for CCR.
@ -93,6 +103,7 @@ public final class CcrLicenseChecker {
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
Collections.emptyMap(),
clusterAlias,
request,
onFailure,
@ -115,6 +126,7 @@ public final class CcrLicenseChecker {
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
@ -122,12 +134,14 @@ public final class CcrLicenseChecker {
*/
public <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
clusterAlias,
request,
onFailure,
@ -144,6 +158,7 @@ public final class CcrLicenseChecker {
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
@ -153,6 +168,7 @@ public final class CcrLicenseChecker {
*/
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
@ -167,7 +183,7 @@ public final class CcrLicenseChecker {
@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
@ -237,6 +253,33 @@ public final class CcrLicenseChecker {
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}
public static Client wrapClient(Client client, Map<String, String> headers) {
if (headers.isEmpty()) {
return client;
} else {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Map<String, String> filteredHeaders = headers.entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new FilterClient(client) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}
}
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
return storedContext;
}
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();

View File

@ -103,19 +103,22 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
AutoFollower operation = new AutoFollower(handler, followerClusterState) {
@Override
void getLeaderClusterState(final String leaderClusterAlias, final BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(final Map<String, String> headers,
final String leaderClusterAlias,
final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
if ("_local_".equals(leaderClusterAlias)) {
Client client = CcrLicenseChecker.wrapClient(AutoFollowCoordinator.this.client, headers);
client.admin().cluster().state(
request, ActionListener.wrap(r -> handler.accept(r.getState(), null), e -> handler.accept(null, e)));
} else {
final Client leaderClient = client.getRemoteClusterClient(leaderClusterAlias);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
leaderClient,
client,
headers,
leaderClusterAlias,
request,
e -> handler.accept(null, e),
@ -125,15 +128,22 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
}
@Override
void createAndFollow(FollowIndexAction.Request followRequest,
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
client.execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest),
ActionListener.wrap(r -> successHandler.run(), failureHandler));
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(followRequest);
followerClient.execute(
CreateAndFollowIndexAction.INSTANCE,
request,
ActionListener.wrap(r -> successHandler.run(), failureHandler)
);
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() {
@Override
@ -188,7 +198,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
AutoFollowPattern autoFollowPattern = entry.getValue();
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
getLeaderClusterState(clusterAlias, (leaderClusterState, e) -> {
getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
@ -251,7 +261,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
finalise(followError);
}
};
createAndFollow(followRequest, successHandler, failureHandler);
createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler);
}
}
}
@ -314,14 +324,27 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
/**
* Fetch the cluster state from the leader with the specified cluster alias
*
* @param headers the client headers
* @param leaderClusterAlias the cluster alias of the leader
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler);
abstract void getLeaderClusterState(
Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler
);
abstract void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler);
abstract void createAndFollow(
Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);
abstract void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler);
abstract void updateAutoFollowMetadata(
Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler
);
}
}

View File

@ -5,18 +5,13 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -24,7 +19,6 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.SeqNoStats;
@ -48,8 +42,8 @@ import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
@ -86,11 +80,11 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
ShardFollowTask params = taskInProgress.getParams();
final Client leaderClient;
if (params.getLeaderClusterAlias() != null) {
leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params);
leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params.getHeaders());
} else {
leaderClient = wrapClient(client, params);
leaderClient = wrapClient(client, params.getHeaders());
}
Client followerClient = wrapClient(client, params);
Client followerClient = wrapClient(client, params.getHeaders());
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
try {
threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command);
@ -160,7 +154,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) {
Client followerClient = wrapClient(client, params);
Client followerClient = wrapClient(client, params.getHeaders());
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId());
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
@ -189,31 +183,4 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}, errorHandler));
}
private static Client wrapClient(Client client, ShardFollowTask shardFollowTask) {
if (shardFollowTask.getHeaders().isEmpty()) {
return client;
} else {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Map<String, String> filteredHeaders = shardFollowTask.getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new FilterClient(client) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}
}
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
return storedContext;
}
}

View File

@ -87,6 +87,10 @@ public class TransportPutAutoFollowPatternAction extends
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
leaderClient.admin().cluster().state(
clusterStateRequest,
ActionListener.wrap(
@ -102,7 +106,7 @@ public class TransportPutAutoFollowPatternAction extends
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, currentState, leaderClusterState);
return innerPut(request, filteredHeaders, currentState, leaderClusterState);
}
});
},
@ -110,6 +114,7 @@ public class TransportPutAutoFollowPatternAction extends
}
static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
Map<String, String> filteredHeaders,
ClusterState localState,
ClusterState leaderClusterState) {
// auto patterns are always overwritten
@ -151,8 +156,8 @@ public class TransportPutAutoFollowPatternAction extends
request.getMaxConcurrentWriteBatches(),
request.getMaxWriteBufferSize(),
request.getMaxRetryDelay(),
request.getIdleShardRetryDelay()
);
request.getIdleShardRetryDelay(),
filteredHeaders);
patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);
ClusterState.Builder newState = ClusterState.builder(localState);
newState.metaData(MetaData.builder(localState.getMetaData())

View File

@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -37,10 +38,17 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase<AutoFol
Map<String, List<String>> followedLeaderIndices = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
List<String> leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false));
AutoFollowMetadata.AutoFollowPattern autoFollowPattern =
new AutoFollowMetadata.AutoFollowPattern(leaderPatterns, randomAlphaOfLength(4), randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500));
AutoFollowMetadata.AutoFollowPattern autoFollowPattern = new AutoFollowMetadata.AutoFollowPattern(
leaderPatterns,
randomAlphaOfLength(4),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
randomNonNegativeLong(),
randomIntBetween(0, Integer.MAX_VALUE),
randomIntBetween(0, Integer.MAX_VALUE),
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(500),
randomBoolean() ? null : Collections.singletonMap("key", "value"));
configs.put(Integer.toString(i), autoFollowPattern);
followedLeaderIndices.put(Integer.toString(i), Arrays.asList(generateRandomStringArray(4, 4, false)));
}

View File

@ -140,7 +140,7 @@ public class CcrLicenseIT extends ESSingleNodeTestCase {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
Collections.singletonMap("test_alias", autoFollowPattern),
Collections.emptyMap()

View File

@ -51,7 +51,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
.build();
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -69,19 +69,25 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
};
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
@Override
void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
@Override
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
successHandler.run();
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
ClusterState resultCs = updateFunction.apply(currentState);
AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE);
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
@ -98,7 +104,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -116,17 +122,23 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(null, failure);
}
@Override
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
fail("should not get here");
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
fail("should not get here");
}
};
@ -146,7 +158,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
.build();
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -164,12 +176,17 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
@Override
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
successHandler.run();
@ -196,7 +213,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
.build();
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
@ -214,19 +231,25 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(String leaderClusterAlias, BiConsumer<ClusterState, Exception> handler) {
void getLeaderClusterState(Map<String, String> headers,
String leaderClusterAlias,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
@Override
void createAndFollow(FollowIndexAction.Request followRequest, Runnable successHandler, Consumer<Exception> failureHandler) {
void createAndFollow(Map<String, String> headers,
FollowIndexAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
failureHandler.accept(failure);
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
fail("should not get here");
}
};
@ -236,7 +259,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testGetLeaderIndicesToFollow() {
AutoFollowPattern autoFollowPattern =
new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null);
new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null);
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap())))
@ -282,15 +305,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testGetFollowerIndexName() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0"));
autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null,
null, null, null, null, null);
null, null, null, null, null, null);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0"));
}

View File

@ -30,7 +30,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("eu_cluster",
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -40,7 +40,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("logs-*");
existingAutoFollowPatterns.put("asia_cluster",
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -69,7 +69,7 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("eu_cluster",
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
}
ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,

View File

@ -39,7 +39,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
.metaData(MetaData.builder())
.build();
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState);
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, null, localState, remoteState);
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
@ -78,7 +78,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
.metaData(mdBuilder)
.build();
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState);
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, null, localState, remoteState);
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));
@ -97,7 +97,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
List<String> existingPatterns = new ArrayList<>();
existingPatterns.add("transactions-*");
existingAutoFollowPatterns.put("eu_cluster",
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null));
new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null));
Map<String, List<String>> existingAlreadyFollowedIndexUUIDS = new HashMap<>();
List<String> existingUUIDS = new ArrayList<>();
existingUUIDS.add("_val");
@ -120,7 +120,7 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase {
.metaData(mdBuilder)
.build();
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, localState, remoteState);
ClusterState result = TransportPutAutoFollowPatternAction.innerPut(request, null, localState, remoteState);
AutoFollowMetadata autoFollowMetadata = result.metaData().custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
assertThat(autoFollowMetadata.getPatterns().size(), equalTo(1));

View File

@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.security.xcontent.XContentUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@ -171,12 +172,14 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
private static final ParseField HEADERS = new ParseField("headers");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<AutoFollowPattern, Void> PARSER =
new ConstructingObjectParser<>("auto_follow_pattern",
args -> new AutoFollowPattern((List<String>) args[0], (String) args[1], (Integer) args[2], (Integer) args[3],
(Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]));
(Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8],
(Map<String, String>) args[9]));
static {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD);
@ -192,6 +195,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
}
private final List<String> leaderIndexPatterns;
@ -203,10 +207,18 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
private final Integer maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;
private final Map<String, String> headers;
public AutoFollowPattern(List<String> leaderIndexPatterns, String followIndexPattern, Integer maxBatchOperationCount,
Integer maxConcurrentReadBatches, Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches,
Integer maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay) {
public AutoFollowPattern(List<String> leaderIndexPatterns,
String followIndexPattern,
Integer maxBatchOperationCount,
Integer maxConcurrentReadBatches,
Long maxOperationSizeInBytes,
Integer maxConcurrentWriteBatches,
Integer maxWriteBufferSize,
TimeValue maxRetryDelay,
TimeValue idleShardRetryDelay,
Map<String, String> headers) {
this.leaderIndexPatterns = leaderIndexPatterns;
this.followIndexPattern = followIndexPattern;
this.maxBatchOperationCount = maxBatchOperationCount;
@ -216,6 +228,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
AutoFollowPattern(StreamInput in) throws IOException {
@ -228,6 +241,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
maxWriteBufferSize = in.readOptionalVInt();
maxRetryDelay = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
public boolean match(String indexName) {
@ -274,6 +288,10 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
return idleShardRetryDelay;
}
public Map<String, String> getHeaders() {
return headers;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringList(leaderIndexPatterns);
@ -285,6 +303,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(idleShardRetryDelay);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
@Override
@ -314,6 +333,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
if (idleShardRetryDelay != null) {
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay);
}
builder.field(HEADERS.getPreferredName(), headers);
return builder;
}
@ -335,7 +355,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay);
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
Objects.equals(headers, that.headers);
}
@Override
@ -349,7 +370,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
maxConcurrentWriteBatches,
maxWriteBufferSize,
maxRetryDelay,
idleShardRetryDelay
idleShardRetryDelay,
headers
);
}
}