[CCR] Add validation checks that were left out of #30120 (#30463)

This commit is contained in:
Martijn van Groningen 2018-05-16 09:46:03 +02:00 committed by GitHub
parent 23204e3d09
commit 596ec1848e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 130 additions and 44 deletions

View File

@ -64,6 +64,11 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
final String indexName2 = "index2"; final String indexName2 = "index2";
if (runningAgainstLeaderCluster) { if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster"); logger.info("Running against leader cluster");
Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
createIndex(indexName1, indexSettings);
createIndex(indexName2, indexSettings);
for (int i = 0; i < numDocs; i++) { for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i); logger.info("Indexing doc [{}]", i);
index(indexName1, Integer.toString(i), "field", i); index(indexName1, Integer.toString(i), "field", i);
@ -169,6 +174,10 @@ public class FollowIndexSecurityIT extends ESRestTestCase {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
} }
protected static void createIndex(String name, Settings settings) throws IOException {
createIndex(name, settings, "");
}
protected static void createIndex(String name, Settings settings, String mapping) throws IOException { protected static void createIndex(String name, Settings settings, String mapping) throws IOException {
assertOK(adminClient().performRequest(HttpPut.METHOD_NAME, name, Collections.emptyMap(), assertOK(adminClient().performRequest(HttpPut.METHOD_NAME, name, Collections.emptyMap(),
new StringEntity("{ \"settings\": " + Strings.toString(settings) new StringEntity("{ \"settings\": " + Strings.toString(settings)

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.persistent.PersistentTasksService;
@ -224,29 +225,13 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
*/ */
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) { ActionListener<Response> handler) {
if (leaderIndexMetadata == null) { validate (leaderIndexMetadata ,followIndexMetadata , request);
handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
return;
}
if (followIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
return;
}
if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
handler.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
// TODO: other validation checks
} else {
final int numShards = followIndexMetadata.getNumberOfShards(); final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards); final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
for (int i = 0; i < numShards; i++) {
final int shardId = i; final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
@ -261,39 +246,59 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
finalizeResponse(); finalizeResponse();
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
responses.set(shardId, e); responses.set(shardId, e);
finalizeResponse(); finalizeResponse();
} }
void finalizeResponse() { void finalizeResponse() {
Exception error = null; Exception error = null;
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) { for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j); Object response = responses.get(j);
if (response instanceof Exception) { if (response instanceof Exception) {
if (error == null) { if (error == null) {
error = (Exception) response; error = (Exception) response;
} else { } else {
error.addSuppressed((Throwable) response); error.addSuppressed((Throwable) response);
}
} }
} }
}
if (error == null) { if (error == null) {
// include task ids? // include task ids?
handler.onResponse(new Response(true)); handler.onResponse(new Response(true));
} else { } else {
// TODO: cancel all started tasks // TODO: cancel all started tasks
handler.onFailure(error); handler.onFailure(error);
}
} }
} }
} }
); }
} );
} }
} }
} }
static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}
if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
}
if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
}
// TODO: other validation checks
}
} }

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -143,7 +144,8 @@ public class ShardChangesIT extends ESIntegTestCase {
public void testFollowIndex() throws Exception { public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3); final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap()); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
final String followerIndexSettings = final String followerIndexSettings =

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class FollowExistingIndexActionTests extends ESTestCase {
public void testValidation() {
FollowExistingIndexAction.Request request = new FollowExistingIndexAction.Request();
request.setLeaderIndex("index1");
request.setFollowIndex("index2");
{
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(null, null, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5);
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(leaderIMD, null, request));
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5);
IndexMetaData followIMD = createIMD("index2", 5);
Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
IndexMetaData followIMD = createIMD("index2", 4);
Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(),
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
IndexMetaData followIMD = createIMD("index2", 5);
FollowExistingIndexAction.validate(leaderIMD, followIMD, request);
}
}
private static IndexMetaData createIMD(String index, int numShards, Tuple<?, ?>... settings) {
Settings.Builder settingsBuilder = settings(Version.CURRENT);
for (Tuple<?, ?> setting : settings) {
settingsBuilder.put((String) setting.v1(), (String) setting.v2());
}
return IndexMetaData.builder(index).settings(settingsBuilder)
.numberOfShards(numShards)
.numberOfReplicas(0)
.setRoutingNumShards(numShards).build();
}
}

View File

@ -4,6 +4,10 @@
indices.create: indices.create:
index: foo index: foo
body: body:
settings:
index:
soft_deletes:
enabled: true
mappings: mappings:
doc: doc:
properties: properties: