diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index fad11aca127..b257cb14635 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -16,6 +16,8 @@ archivesBaseName = 'x-pack-ccr' integTest.enabled = false +compileJava.options.compilerArgs << "-Xlint:-try" + // Instead we create a separate task to run the // tests based on ESIntegTestCase task internalClusterTest(type: RandomizedTestingTask, @@ -30,7 +32,7 @@ task internalClusterTest(type: RandomizedTestingTask, } check { - dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest'] + dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest', 'qa:multi-cluster-with-security:followClusterTest'] } internalClusterTest.mustRunAfter test diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle new file mode 100644 index 00000000000..cee78c0c910 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle @@ -0,0 +1,74 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'runtime') + testCompile project(path: xpackModule('ccr'), configuration: 'runtime') +} + +task leaderClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +leaderClusterTestCluster { + numNodes = 1 + clusterName = 'leader-cluster' + setting 'xpack.license.self_generated.type', 'trial' + setting 'xpack.security.enabled', 'true' + setting 'xpack.monitoring.enabled', 'false' + extraConfigFile 'roles.yml', 'roles.yml' + setupCommand 'setupTestAdmin', + 'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser" + setupCommand 'setupCcrUser', + 'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "manage_ccr" + waitCondition = { node, ant -> + File tmpFile = new File(node.cwd, 'wait.success') + ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow", + dest: tmpFile.toString(), + username: 'test_admin', + password: 'x-pack-test-password', + ignoreerrors: true, + retries: 10) + return tmpFile.exists() + } +} + +leaderClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'true' +} + +task followClusterTest(type: RestIntegTestTask) {} + +followClusterTestCluster { + dependsOn leaderClusterTestRunner + numNodes = 1 + clusterName = 'follow-cluster' + setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" + setting 'xpack.license.self_generated.type', 'trial' + setting 'xpack.security.enabled', 'true' + setting 'xpack.monitoring.enabled', 'false' + extraConfigFile 'roles.yml', 'roles.yml' + setupCommand 'setupTestAdmin', + 'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser" + setupCommand 'setupCcrUser', + 'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "ccruser" + waitCondition = { node, ant -> + File tmpFile = new File(node.cwd, 'wait.success') + ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow", + dest: tmpFile.toString(), + username: 'test_admin', + password: 'x-pack-test-password', + ignoreerrors: true, + retries: 10) + return tmpFile.exists() + } +} + +followClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + finalizedBy 'leaderClusterTestCluster#stop' +} + +test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml b/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml new file mode 100644 index 00000000000..9ae4922b8e6 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/roles.yml @@ -0,0 +1,9 @@ +ccruser: + cluster: + - manage_ccr + indices: + - names: [ 'index1' ] + privileges: + - monitor + - read + - write diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java new file mode 100644 index 00000000000..cb7a1879f3b --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -0,0 +1,187 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.equalTo; + +public class FollowIndexSecurityIT extends ESRestTestCase { + + private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); + + @Override + protected Settings restClientSettings() { + String token = basicAuthHeaderValue("test_ccr", new SecureString("x-pack-test-password".toCharArray())); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + @Override + protected Settings restAdminSettings() { + String token = basicAuthHeaderValue("test_admin", new SecureString("x-pack-test-password".toCharArray())); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + public void testFollowIndex() throws Exception { + final int numDocs = 16; + final String indexName1 = "index1"; + final String indexName2 = "index2"; + if (runningAgainstLeaderCluster) { + logger.info("Running against leader cluster"); + for (int i = 0; i < numDocs; i++) { + logger.info("Indexing doc [{}]", i); + index(indexName1, Integer.toString(i), "field", i); + } + for (int i = 0; i < numDocs; i++) { + logger.info("Indexing doc [{}]", i); + index(indexName2, Integer.toString(i), "field", i); + } + refresh(indexName1); + verifyDocuments(adminClient(), indexName1, numDocs); + } else { + logger.info("Running against follow cluster"); + Settings indexSettings = Settings.builder() + .put("index.xpack.ccr.following_index", true) + .build(); + // TODO: remove mapping here when ccr syncs mappings too + createIndex(indexName1, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}"); + ensureYellow(indexName1); + followIndex("leader_cluster:" + indexName1, indexName1); + assertBusy(() -> verifyDocuments(client(), indexName1, numDocs)); + assertThat(countCcrNodeTasks(), equalTo(1)); + assertOK(client().performRequest("POST", "/_xpack/ccr/" + indexName1 + "/_unfollow")); + // Make sure that there are no other ccr relates operations running: + assertBusy(() -> { + Map clusterState = toMap(adminClient().performRequest("GET", "/_cluster/state")); + List tasks = (List) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState); + assertThat(tasks.size(), equalTo(0)); + assertThat(countCcrNodeTasks(), equalTo(0)); + }); + + // TODO: remove mapping here when ccr syncs mappings too + createIndex(indexName2, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"long\" }}}"); + ensureYellow(indexName2); + followIndex("leader_cluster:" + indexName2, indexName2); + // Verify that nothing has been replicated and no node tasks are running + // These node tasks should have been failed due to the fact that the user + // has no sufficient priviledges. + assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); + verifyDocuments(adminClient(), indexName2, 0); + } + } + + private int countCcrNodeTasks() throws IOException { + Map rsp1 = toMap(adminClient().performRequest("GET", "/_tasks", + Collections.singletonMap("detailed", "true"))); + Map nodes = (Map) rsp1.get("nodes"); + assertThat(nodes.size(), equalTo(1)); + Map node = (Map) nodes.values().iterator().next(); + Map nodeTasks = (Map) node.get("tasks"); + int numNodeTasks = 0; + for (Map.Entry entry : nodeTasks.entrySet()) { + Map nodeTask = (Map) entry.getValue(); + String action = (String) nodeTask.get("action"); + if (action.startsWith("shard_follow")) { + numNodeTasks++; + } + } + return numNodeTasks; + } + + private static void index(String index, String id, Object... fields) throws IOException { + XContentBuilder document = jsonBuilder().startObject(); + for (int i = 0; i < fields.length; i += 2) { + document.field((String) fields[i], fields[i + 1]); + } + document.endObject(); + assertOK(adminClient().performRequest("POST", "/" + index + "/doc/" + id, emptyMap(), + new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON))); + } + + private static void refresh(String index) throws IOException { + assertOK(adminClient().performRequest("POST", "/" + index + "/_refresh")); + } + + private static void followIndex(String leaderIndex, String followIndex) throws IOException { + Map params = Collections.singletonMap("leader_index", leaderIndex); + assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params)); + } + + void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException { + Map params = new HashMap<>(); + params.put("size", Integer.toString(expectedNumDocs)); + params.put("sort", "field:asc"); + params.put("pretty", "true"); + Map response = toMap(client.performRequest("GET", "/" + index + "/_search", params)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(numDocs, equalTo(expectedNumDocs)); + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), equalTo(expectedNumDocs)); + for (int i = 0; i < expectedNumDocs; i++) { + int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); + assertThat(i, equalTo(value)); + } + } + + private static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + private static Map toMap(String response) { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + + protected static void createIndex(String name, Settings settings, String mapping) throws IOException { + assertOK(adminClient().performRequest(HttpPut.METHOD_NAME, name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings) + + ", \"mappings\" : {" + mapping + "} }", ContentType.APPLICATION_JSON))); + } + + private static void ensureYellow(String index) throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "yellow"); + params.put("wait_for_no_relocating_shards", "true"); + params.put("timeout", "30s"); + params.put("level", "shards"); + assertOK(adminClient().performRequest("GET", "_cluster/health/" + index, params)); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java index 2954ffbd440..963f17d9758 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; public class FollowExistingIndexAction extends Action { @@ -242,13 +243,16 @@ public class FollowExistingIndexAction extends Action responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, new ShardId(followIndexMetadata.getIndex(), shardId), new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes); + request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 0e61521f95f..d4bae449d2b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -42,7 +42,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; public class ShardChangesAction extends Action { public static final ShardChangesAction INSTANCE = new ShardChangesAction(); - public static final String NAME = "cluster:admin/xpack/ccr/shard_changes"; + public static final String NAME = "indices:data/read/xpack/ccr/shard_changes"; private ShardChangesAction() { super(NAME); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 09202c925ba..3b2ec2eb900 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -10,20 +10,27 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.tasks.Task; import org.elasticsearch.persistent.PersistentTaskParams; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; public class ShardFollowTask implements PersistentTaskParams { public static final String NAME = "shard_follow"; + // list of headers that will be stored when a job is created + public static final Set HEADER_FILTERS = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"))); + static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias"); static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index"); static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid"); @@ -31,13 +38,16 @@ public class ShardFollowTask implements PersistentTaskParams { static final ParseField LEADER_SHARD_INDEX_FIELD = new ParseField("leader_shard_index"); static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); + static final ParseField HEADERS = new ParseField("headers"); public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size"); public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks"); public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes"); + @SuppressWarnings("unchecked") public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), - new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9])); + new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9], + (Map) a[10])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -50,6 +60,7 @@ public class ShardFollowTask implements PersistentTaskParams { PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE); PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS); PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } private final String leaderClusterAlias; @@ -58,15 +69,17 @@ public class ShardFollowTask implements PersistentTaskParams { private final long maxChunkSize; private final int numConcurrentChunks; private final long processorMaxTranslogBytes; + private final Map headers; ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize, - int numConcurrentChunks, long processorMaxTranslogBytes) { + int numConcurrentChunks, long processorMaxTranslogBytes, Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; this.maxChunkSize = maxChunkSize; this.numConcurrentChunks = numConcurrentChunks; this.processorMaxTranslogBytes = processorMaxTranslogBytes; + this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } public ShardFollowTask(StreamInput in) throws IOException { @@ -76,6 +89,7 @@ public class ShardFollowTask implements PersistentTaskParams { this.maxChunkSize = in.readVLong(); this.numConcurrentChunks = in.readVInt(); this.processorMaxTranslogBytes = in.readVLong(); + this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public String getLeaderClusterAlias() { @@ -102,6 +116,10 @@ public class ShardFollowTask implements PersistentTaskParams { return processorMaxTranslogBytes; } + public Map getHeaders() { + return headers; + } + @Override public String getWriteableName() { return NAME; @@ -115,6 +133,7 @@ public class ShardFollowTask implements PersistentTaskParams { out.writeVLong(maxChunkSize); out.writeVInt(numConcurrentChunks); out.writeVLong(processorMaxTranslogBytes); + out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } public static ShardFollowTask fromXContent(XContentParser parser) { @@ -136,6 +155,7 @@ public class ShardFollowTask implements PersistentTaskParams { builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize); builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks); builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes); + builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -149,13 +169,14 @@ public class ShardFollowTask implements PersistentTaskParams { Objects.equals(leaderShardId, that.leaderShardId) && maxChunkSize == that.maxChunkSize && numConcurrentChunks == that.numConcurrentChunks && - processorMaxTranslogBytes == that.processorMaxTranslogBytes; + processorMaxTranslogBytes == that.processorMaxTranslogBytes && + Objects.equals(headers, that.headers); } @Override public int hashCode() { return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, - processorMaxTranslogBytes); + processorMaxTranslogBytes, headers); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 718a72388a1..39df5407365 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -8,11 +8,17 @@ package org.elasticsearch.xpack.ccr.action; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.logging.Loggers; @@ -21,6 +27,7 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -44,6 +51,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; public class ShardFollowTasksExecutor extends PersistentTasksExecutor { @@ -89,17 +98,19 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint); - prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint); + prepare(leaderClient, followerClient,shardFollowNodeTask, params, followGlobalCheckPoint); }, task::markAsFailed); } - void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { + void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, + long followGlobalCheckPoint) { if (task.getState() != AllocatedPersistentTask.State.STARTED) { // TODO: need better cancellation control return; @@ -111,7 +122,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler = e -> { if (e == null) { task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - prepare(leaderClient, task, params, leaderGlobalCheckPoint); + prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint); } else { task.markAsFailed(e); } }; - ChunksCoordinator coordinator = new ChunksCoordinator(client, leaderClient, ccrExecutor, params.getMaxChunkSize(), + ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler); coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); coordinator.start(); @@ -134,7 +145,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor filteredHeaders = shardFollowTask.getHeaders().entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new FilterClient(client) { + @Override + protected < + Request extends ActionRequest, + Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder> + void doExecute(Action action, Request request, ActionListener listener) { + final Supplier supplier = threadContext.newRestorableContext(false); + try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, filteredHeaders)) { + super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + } + } + }; + } + } + + private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map headers) { + final ThreadContext.StoredContext storedContext = threadContext.stashContext(); + threadContext.copyHeaders(headers.entrySet()); + return storedContext; + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 148f3acd209..b128e88e63a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.util.Collections; public class ShardFollowTaskTests extends AbstractSerializingTestCase { @@ -26,7 +27,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase ACTION_MATCHER = ClusterPrivilege.ALL.predicate(); @@ -80,6 +83,7 @@ public final class ClusterPrivilege extends Privilege { .put("manage_saml", MANAGE_SAML) .put("manage_pipeline", MANAGE_PIPELINE) .put("manage_rollup", MANAGE_ROLLUP) + .put("manage_ccr", MANAGE_CCR) .immutableMap(); private static final ConcurrentHashMap, ClusterPrivilege> CACHE = new ConcurrentHashMap<>(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java index 6857a48784b..c4c95211d4c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java @@ -127,4 +127,13 @@ public class PrivilegeTests extends ESTestCase { assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync[p]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync[r]"), is(true)); } + + public void testManageCcrPrivilege() { + Predicate predicate = ClusterPrivilege.MANAGE_CCR.predicate(); + assertThat(predicate.test("cluster:admin/xpack/ccr/follow_index"), is(true)); + assertThat(predicate.test("cluster:admin/xpack/ccr/unfollow_index"), is(true)); + assertThat(predicate.test("cluster:admin/xpack/ccr/brand_new_api"), is(true)); + assertThat(predicate.test("cluster:admin/xpack/whatever"), is(false)); + } + }