diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index db7948cfb41..1dca0522eb7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -19,23 +19,37 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexingSlowLog; +import org.elasticsearch.index.SearchSlowLog; +import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesRequestCache; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; @@ -151,16 +165,18 @@ public class FollowIndexAction extends Action */ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, - ActionListener handler) { - validate (leaderIndexMetadata ,followIndexMetadata , request); - final int numShards = followIndexMetadata.getNumberOfShards(); - final AtomicInteger counter = new AtomicInteger(numShards); - final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); - Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() - .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { - final int shardId = i; - String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; - ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, - new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); - persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - responses.set(shardId, task); - finalizeResponse(); - } - + ActionListener handler) throws IOException { + MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; + validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); + persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, + new ActionListener>() { @Override - public void onFailure(Exception e) { - responses.set(shardId, e); + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); finalizeResponse(); } - void finalizeResponse() { - Exception error = null; - if (counter.decrementAndGet() == 0) { - for (int j = 0; j < responses.length(); j++) { - Object response = responses.get(j); - if (response instanceof Exception) { - if (error == null) { - error = (Exception) response; - } else { - error.addSuppressed((Throwable) response); - } + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); } } + } - if (error == null) { - // include task ids? - handler.onResponse(new Response(true)); - } else { - // TODO: cancel all started tasks - handler.onFailure(error); - } + if (error == null) { + // include task ids? + handler.onResponse(new Response(true)); + } else { + // TODO: cancel all started tasks + handler.onFailure(error); } } } + } ); } } } + private static final Set> WHITELISTED_SETTINGS; - static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) { + static { + Set> whiteListedSettings = new HashSet<>(); + whiteListedSettings.add(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING); + whiteListedSettings.add(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING); + + whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING); + whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING); + whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING); + whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING); + whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING); + whiteListedSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING); + + whiteListedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING); + whiteListedSettings.add(IndexSettings.MAX_RESULT_WINDOW_SETTING); + whiteListedSettings.add(IndexSettings.INDEX_WARMER_ENABLED_SETTING); + whiteListedSettings.add(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING); + whiteListedSettings.add(IndexSettings.MAX_RESCORE_WINDOW_SETTING); + whiteListedSettings.add(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING); + whiteListedSettings.add(IndexSettings.DEFAULT_FIELD_SETTING); + whiteListedSettings.add(IndexSettings.QUERY_STRING_LENIENT_SETTING); + whiteListedSettings.add(IndexSettings.QUERY_STRING_ANALYZE_WILDCARD); + whiteListedSettings.add(IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD); + whiteListedSettings.add(IndexSettings.ALLOW_UNMAPPED); + whiteListedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER); + whiteListedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING); + + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING); + whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING); + whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING); + + whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING); + whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING); + + WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings); + } + + static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, MapperService followerMapperService) { if (leaderIndex == null) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); } - if (followIndex == null) { throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"); } if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) { throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled"); } - if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) { throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() + "] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]"); } - // TODO: other validation checks + if (leaderIndex.getRoutingNumShards() != followIndex.getRoutingNumShards()) { + throw new IllegalArgumentException("leader index number_of_routing_shards [" + leaderIndex.getRoutingNumShards() + + "] does not match with the number_of_routing_shards of the follow index [" + followIndex.getRoutingNumShards() + "]"); + } + if (leaderIndex.getState() != IndexMetaData.State.OPEN || followIndex.getState() != IndexMetaData.State.OPEN) { + throw new IllegalArgumentException("leader and follow index must be open"); + } + + // Make a copy, remove settings that are allowed to be different and then compare if the settings are equal. + Settings leaderSettings = filter(leaderIndex.getSettings()); + Settings followerSettings = filter(followIndex.getSettings()); + if (leaderSettings.equals(followerSettings) == false) { + throw new IllegalArgumentException("the leader and follower index settings must be identical"); + } + + // Validates if the current follower mapping is mergable with the leader mapping. + // This also validates for example whether specific mapper plugins have been installed + followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY); + } + + private static Settings filter(Settings originalSettings) { + Settings.Builder settings = Settings.builder().put(originalSettings); + // Remove settings that are always going to be different between leader and follow index: + settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()); + settings.remove(IndexMetaData.SETTING_INDEX_UUID); + settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME); + settings.remove(IndexMetaData.SETTING_CREATION_DATE); + + Iterator iterator = settings.keys().iterator(); + while (iterator.hasNext()) { + String key = iterator.next(); + for (Setting whitelistedSetting : WHITELISTED_SETTINGS) { + if (whitelistedSetting.match(key)) { + iterator.remove(); + break; + } + } + } + return settings.build(); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index e67119bd76b..a27294ccf2d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -7,60 +7,147 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData.State; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MapperTestUtils; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; + import static org.hamcrest.Matchers.equalTo; public class FollowIndexActionTests extends ESTestCase { - public void testValidation() { + public void testValidation() throws IOException { FollowIndexAction.Request request = new FollowIndexAction.Request(); request.setLeaderIndex("index1"); request.setFollowIndex("index2"); { - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(null, null, request)); + // should fail, because leader index does not exist + Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); } { + // should fail, because follow index does not exist IndexMetaData leaderIMD = createIMD("index1", 5); - Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(leaderIMD, null, request)); + Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, null, null)); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); } { + // should fail because leader index does not have soft deletes enabled IndexMetaData leaderIMD = createIMD("index1", 5); IndexMetaData followIMD = createIMD("index2", 5); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(leaderIMD, followIMD, request)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { + // should fail because the number of primary shards between leader and follow index are not equal IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", 4); Exception e = expectThrows(IllegalArgumentException.class, - () -> FollowIndexAction.validate(leaderIMD, followIMD, request)); + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); assertThat(e.getMessage(), equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); } { + // should fail, because leader index is closed + IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, + new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, + new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + assertThat(e.getMessage(), equalTo("leader and follow index must be open")); + } + { + // should fail, because leader has a field with the same name mapped as keyword and follower as text + IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5, + new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5); + MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); + mapperService.updateMapping(followIMD); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService)); + assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]")); + } + { + // should fail because of non whitelisted settings not the same between leader and follow index + String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}"; + IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, + new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"), + new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"), + new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace")); + IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, + new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"), + new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard")); + Exception e = expectThrows(IllegalArgumentException.class, + () -> FollowIndexAction.validate(request, leaderIMD, followIMD, null)); + assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical")); + } + { + // should succeed IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); IndexMetaData followIMD = createIMD("index2", 5); - FollowIndexAction.validate(leaderIMD, followIMD, request); + MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); + mapperService.updateMapping(followIMD); + FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + } + { + // should succeed, index settings are identical + String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}"; + IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, + new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"), + new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"), + new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard")); + IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, + new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"), + new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard")); + MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), + followIMD.getSettings(), "index2"); + mapperService.updateMapping(followIMD); + FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); + } + { + // should succeed despite whitelisted settings being different + String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}"; + IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, + new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"), + new Tuple<>(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s"), + new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"), + new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard")); + IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, + new Tuple<>(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s"), + new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"), + new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard")); + MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), + followIMD.getSettings(), "index2"); + mapperService.updateMapping(followIMD); + FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); } } + + private static IndexMetaData createIMD(String index, int numShards, Tuple... settings) throws IOException { + return createIMD(index, State.OPEN, "{\"properties\": {}}", numShards, settings); + } - private static IndexMetaData createIMD(String index, int numShards, Tuple... settings) { + private static IndexMetaData createIMD(String index, State state, String mapping, int numShards, + Tuple... settings) throws IOException { Settings.Builder settingsBuilder = settings(Version.CURRENT); for (Tuple setting : settings) { settingsBuilder.put((String) setting.v1(), (String) setting.v2()); } return IndexMetaData.builder(index).settings(settingsBuilder) .numberOfShards(numShards) + .state(state) .numberOfReplicas(0) - .setRoutingNumShards(numShards).build(); + .setRoutingNumShards(numShards) + .putMapping("_doc", mapping) + .build(); } }