[CCR] Make cross cluster replication work with security (#30239)

If security is enabled today with ccr then the follow index api will
fail with the fact that system user does not have privileges to use
the shard changes api. The reason that system user is used is because
the persistent tasks that keep the shards in sync runs in the background
and the user that invokes the follow index api only start those background
processes.

I think it is better that the system user isn't used by the persistent
tasks that keep shards in sync, but rather runs as the same user that
invoked the follow index api and use the permissions that that user has.
This is what this PR does, and this is done by keeping track of
security headers inside  the persistent task (similar to how rollup does this).

This PR also adds a cluster ccr priviledge that allows a user to follow
or unfollow an index. Finally if a user that wants to follow an index,
it needs to have read and monitor privileges on the leader index and
monitor and write privileges on the follow index.
This commit is contained in:
Martijn van Groningen 2018-05-16 07:48:32 +02:00 committed by GitHub
parent b12c2f61c5
commit 64b97313d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 373 additions and 20 deletions

View File

@ -16,6 +16,8 @@ archivesBaseName = 'x-pack-ccr'
integTest.enabled = false
compileJava.options.compilerArgs << "-Xlint:-try"
// Instead we create a separate task to run the
// tests based on ESIntegTestCase
task internalClusterTest(type: RandomizedTestingTask,
@ -30,7 +32,7 @@ task internalClusterTest(type: RandomizedTestingTask,
}
check {
dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest']
dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest', 'qa:multi-cluster-with-security:followClusterTest']
}
internalClusterTest.mustRunAfter test

View File

@ -0,0 +1,74 @@
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.standalone-test'
dependencies {
testCompile project(path: xpackModule('core'), configuration: 'runtime')
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
}
task leaderClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}
leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.enabled', 'false'
extraConfigFile 'roles.yml', 'roles.yml'
setupCommand 'setupTestAdmin',
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"
setupCommand 'setupCcrUser',
'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "manage_ccr"
waitCondition = { node, ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
dest: tmpFile.toString(),
username: 'test_admin',
password: 'x-pack-test-password',
ignoreerrors: true,
retries: 10)
return tmpFile.exists()
}
}
leaderClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'true'
}
task followClusterTest(type: RestIntegTestTask) {}
followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
setting 'xpack.monitoring.enabled', 'false'
extraConfigFile 'roles.yml', 'roles.yml'
setupCommand 'setupTestAdmin',
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"
setupCommand 'setupCcrUser',
'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "ccruser"
waitCondition = { node, ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
dest: tmpFile.toString(),
username: 'test_admin',
password: 'x-pack-test-password',
ignoreerrors: true,
retries: 10)
return tmpFile.exists()
}
}
followClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'false'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
finalizedBy 'leaderClusterTestCluster#stop'
}
test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test

View File

@ -0,0 +1,9 @@
ccruser:
cluster:
- manage_ccr
indices:
- names: [ 'index1' ]
privileges:
- monitor
- read
- write

View File

@ -0,0 +1,187 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
public class FollowIndexSecurityIT extends ESRestTestCase {
private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));
@Override
protected Settings restClientSettings() {
String token = basicAuthHeaderValue("test_ccr", new SecureString("x-pack-test-password".toCharArray()));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
@Override
protected Settings restAdminSettings() {
String token = basicAuthHeaderValue("test_admin", new SecureString("x-pack-test-password".toCharArray()));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
public void testFollowIndex() throws Exception {
final int numDocs = 16;
final String indexName1 = "index1";
final String indexName2 = "index2";
if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster");
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(indexName1, Integer.toString(i), "field", i);
}
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(indexName2, Integer.toString(i), "field", i);
}
refresh(indexName1);
verifyDocuments(adminClient(), indexName1, numDocs);
} else {
logger.info("Running against follow cluster");
Settings indexSettings = Settings.builder()
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(indexName1, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}");
ensureYellow(indexName1);
followIndex("leader_cluster:" + indexName1, indexName1);
assertBusy(() -> verifyDocuments(client(), indexName1, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest("POST", "/_xpack/ccr/" + indexName1 + "/_unfollow"));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Map<String, Object> clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state"));
List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});
// TODO: remove mapping here when ccr syncs mappings too
createIndex(indexName2, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}");
ensureYellow(indexName2);
followIndex("leader_cluster:" + indexName2, indexName2);
// Verify that nothing has been replicated and no node tasks are running
// These node tasks should have been failed due to the fact that the user
// has no sufficient priviledges.
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), indexName2, 0);
}
}
private int countCcrNodeTasks() throws IOException {
Map<String, Object> rsp1 = toMap(adminClient().performRequest("GET", "/_tasks",
Collections.singletonMap("detailed", "true")));
Map<?, ?> nodes = (Map<?, ?>) rsp1.get("nodes");
assertThat(nodes.size(), equalTo(1));
Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
Map<?, ?> nodeTasks = (Map<?, ?>) node.get("tasks");
int numNodeTasks = 0;
for (Map.Entry<?, ?> entry : nodeTasks.entrySet()) {
Map<?, ?> nodeTask = (Map<?, ?>) entry.getValue();
String action = (String) nodeTask.get("action");
if (action.startsWith("shard_follow")) {
numNodeTasks++;
}
}
return numNodeTasks;
}
private static void index(String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
}
document.endObject();
assertOK(adminClient().performRequest("POST", "/" + index + "/doc/" + id, emptyMap(),
new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON)));
}
private static void refresh(String index) throws IOException {
assertOK(adminClient().performRequest("POST", "/" + index + "/_refresh"));
}
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params));
}
void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs));
params.put("sort", "field:asc");
params.put("pretty", "true");
Map<String, ?> response = toMap(client.performRequest("GET", "/" + index + "/_search", params));
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(numDocs, equalTo(expectedNumDocs));
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), equalTo(expectedNumDocs));
for (int i = 0; i < expectedNumDocs; i++) {
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
assertThat(i, equalTo(value));
}
}
private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
protected static void createIndex(String name, Settings settings, String mapping) throws IOException {
assertOK(adminClient().performRequest(HttpPut.METHOD_NAME, name, Collections.emptyMap(),
new StringEntity("{ \"settings\": " + Strings.toString(settings)
+ ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON)));
}
private static void ensureYellow(String index) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("wait_for_status", "yellow");
params.put("wait_for_no_relocating_shards", "true");
params.put("timeout", "30s");
params.put("level", "shards");
assertOK(adminClient().performRequest("GET", "_cluster/health/" + index, params));
}
}

View File

@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.Request,
FollowExistingIndexAction.Response, FollowExistingIndexAction.RequestBuilder> {
@ -242,13 +243,16 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override

View File

@ -42,7 +42,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class ShardChangesAction extends Action<ShardChangesAction.Request, ShardChangesAction.Response, ShardChangesAction.RequestBuilder> {
public static final ShardChangesAction INSTANCE = new ShardChangesAction();
public static final String NAME = "cluster:admin/xpack/ccr/shard_changes";
public static final String NAME = "indices:data/read/xpack/ccr/shard_changes";
private ShardChangesAction() {
super(NAME);

View File

@ -10,20 +10,27 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.persistent.PersistentTaskParams;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class ShardFollowTask implements PersistentTaskParams {
public static final String NAME = "shard_follow";
// list of headers that will be stored when a job is created
public static final Set<String> HEADER_FILTERS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")));
static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias");
static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index");
static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid");
@ -31,13 +38,16 @@ public class ShardFollowTask implements PersistentTaskParams {
static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index");
static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid");
static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard");
static final ParseField HEADERS = new ParseField("headers");
public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size");
public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks");
public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes");
@SuppressWarnings("unchecked")
public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9]));
new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9],
(Map<String, String>) a[10]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD);
@ -50,6 +60,7 @@ public class ShardFollowTask implements PersistentTaskParams {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
}
private final String leaderClusterAlias;
@ -58,15 +69,17 @@ public class ShardFollowTask implements PersistentTaskParams {
private final long maxChunkSize;
private final int numConcurrentChunks;
private final long processorMaxTranslogBytes;
private final Map<String, String> headers;
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize,
int numConcurrentChunks, long processorMaxTranslogBytes) {
int numConcurrentChunks, long processorMaxTranslogBytes, Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.maxChunkSize = maxChunkSize;
this.numConcurrentChunks = numConcurrentChunks;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}
public ShardFollowTask(StreamInput in) throws IOException {
@ -76,6 +89,7 @@ public class ShardFollowTask implements PersistentTaskParams {
this.maxChunkSize = in.readVLong();
this.numConcurrentChunks = in.readVInt();
this.processorMaxTranslogBytes = in.readVLong();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}
public String getLeaderClusterAlias() {
@ -102,6 +116,10 @@ public class ShardFollowTask implements PersistentTaskParams {
return processorMaxTranslogBytes;
}
public Map<String, String> getHeaders() {
return headers;
}
@Override
public String getWriteableName() {
return NAME;
@ -115,6 +133,7 @@ public class ShardFollowTask implements PersistentTaskParams {
out.writeVLong(maxChunkSize);
out.writeVInt(numConcurrentChunks);
out.writeVLong(processorMaxTranslogBytes);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}
public static ShardFollowTask fromXContent(XContentParser parser) {
@ -136,6 +155,7 @@ public class ShardFollowTask implements PersistentTaskParams {
builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize);
builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks);
builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes);
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
}
@ -149,13 +169,14 @@ public class ShardFollowTask implements PersistentTaskParams {
Objects.equals(leaderShardId, that.leaderShardId) &&
maxChunkSize == that.maxChunkSize &&
numConcurrentChunks == that.numConcurrentChunks &&
processorMaxTranslogBytes == that.processorMaxTranslogBytes;
processorMaxTranslogBytes == that.processorMaxTranslogBytes &&
Objects.equals(headers, that.headers);
}
@Override
public int hashCode() {
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks,
processorMaxTranslogBytes);
processorMaxTranslogBytes, headers);
}
public String toString() {

View File

@ -8,11 +8,17 @@ package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.logging.Loggers;
@ -21,6 +27,7 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -44,6 +51,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
@ -89,17 +98,19 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
Client leaderClient = params.getLeaderClusterAlias() != null ?
this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client;
Client leaderClient = wrapClient(params.getLeaderClusterAlias() != null ?
this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client, params);
Client followerClient = wrapClient(this.client, params);
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(client, params.getFollowShardId(),
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
followGlobalCheckPoint -> {
shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint);
prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint);
prepare(leaderClient, followerClient,shardFollowNodeTask, params, followGlobalCheckPoint);
}, task::markAsFailed);
}
void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
@ -111,7 +122,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
logger.debug("{} no write operations to fetch", followerShard);
retry(leaderClient, task, params, followGlobalCheckPoint);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
@ -121,12 +132,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(leaderClient, task, params, leaderGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, leaderClient, ccrExecutor, params.getMaxChunkSize(),
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
@ -134,7 +145,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}, task::markAsFailed);
}
private void retry(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -143,7 +155,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void doRun() throws Exception {
prepare(leaderClient, task, params, followGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint);
}
});
}
@ -362,4 +374,34 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
static Client wrapClient(Client client, ShardFollowTask shardFollowTask) {
if (shardFollowTask.getHeaders().isEmpty()) {
return client;
} else {
final ThreadContext threadContext = client.threadPool().getThreadContext();
Map<String, String> filteredHeaders = shardFollowTask.getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new FilterClient(client) {
@Override
protected <
Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}
}
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
return storedContext;
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Collections;
public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollowTask> {
@ -26,7 +27,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE),
randomIntBetween(1, Integer.MAX_VALUE));
randomIntBetween(1, Integer.MAX_VALUE), randomBoolean() ? null : Collections.singletonMap("key", "value"));
}
@Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.security.authz.privilege;
import org.apache.lucene.util.automaton.Automaton;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.xpack.core.security.action.token.InvalidateTokenAction;
@ -41,6 +42,7 @@ public final class ClusterPrivilege extends Privilege {
private static final Automaton MANAGE_IDX_TEMPLATE_AUTOMATON = patterns("indices:admin/template/*");
private static final Automaton MANAGE_INGEST_PIPELINE_AUTOMATON = patterns("cluster:admin/ingest/pipeline/*");
private static final Automaton MANAGE_ROLLUP_AUTOMATON = patterns("cluster:admin/xpack/rollup/*", "cluster:monitor/xpack/rollup/*");
private static final Automaton MANAGE_CCR_AUTOMATON = patterns("cluster:admin/xpack/ccr/*", ClusterStateAction.NAME);
public static final ClusterPrivilege NONE = new ClusterPrivilege("none", Automatons.EMPTY);
public static final ClusterPrivilege ALL = new ClusterPrivilege("all", ALL_CLUSTER_AUTOMATON);
@ -60,6 +62,7 @@ public final class ClusterPrivilege extends Privilege {
public static final ClusterPrivilege MANAGE_SECURITY = new ClusterPrivilege("manage_security", MANAGE_SECURITY_AUTOMATON);
public static final ClusterPrivilege MANAGE_SAML = new ClusterPrivilege("manage_saml", MANAGE_SAML_AUTOMATON);
public static final ClusterPrivilege MANAGE_PIPELINE = new ClusterPrivilege("manage_pipeline", "cluster:admin/ingest/pipeline/*");
public static final ClusterPrivilege MANAGE_CCR = new ClusterPrivilege("manage_ccr", MANAGE_CCR_AUTOMATON);
public static final Predicate<String> ACTION_MATCHER = ClusterPrivilege.ALL.predicate();
@ -80,6 +83,7 @@ public final class ClusterPrivilege extends Privilege {
.put("manage_saml", MANAGE_SAML)
.put("manage_pipeline", MANAGE_PIPELINE)
.put("manage_rollup", MANAGE_ROLLUP)
.put("manage_ccr", MANAGE_CCR)
.immutableMap();
private static final ConcurrentHashMap<Set<String>, ClusterPrivilege> CACHE = new ConcurrentHashMap<>();

View File

@ -127,4 +127,13 @@ public class PrivilegeTests extends ESTestCase {
assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync[p]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync[r]"), is(true));
}
public void testManageCcrPrivilege() {
Predicate<String> predicate = ClusterPrivilege.MANAGE_CCR.predicate();
assertThat(predicate.test("cluster:admin/xpack/ccr/follow_index"), is(true));
assertThat(predicate.test("cluster:admin/xpack/ccr/unfollow_index"), is(true));
assertThat(predicate.test("cluster:admin/xpack/ccr/brand_new_api"), is(true));
assertThat(predicate.test("cluster:admin/xpack/whatever"), is(false));
}
}