[CCR] Added more validation to follow index api. (#31068)
This commit is contained in:
parent
1ccb34ac77
commit
cc824ebb5e
|
@ -19,23 +19,37 @@ import org.elasticsearch.client.ElasticsearchClient;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.IndexingSlowLog;
|
||||
import org.elasticsearch.index.SearchSlowLog;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.persistent.PersistentTasksService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.RemoteClusterAware;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -151,16 +165,18 @@ public class FollowIndexAction extends Action<FollowIndexAction.Request, FollowI
|
|||
private final ClusterService clusterService;
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
private final PersistentTasksService persistentTasksService;
|
||||
private final IndicesService indicesService;
|
||||
|
||||
@Inject
|
||||
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService,
|
||||
PersistentTasksService persistentTasksService) {
|
||||
PersistentTasksService persistentTasksService, IndicesService indicesService) {
|
||||
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
|
||||
this.client = client;
|
||||
this.clusterService = clusterService;
|
||||
this.remoteClusterService = transportService.getRemoteClusterService();
|
||||
this.persistentTasksService = persistentTasksService;
|
||||
this.indicesService = indicesService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,7 +189,12 @@ public class FollowIndexAction extends Action<FollowIndexAction.Request, FollowI
|
|||
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
|
||||
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
|
||||
IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex);
|
||||
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
|
||||
try {
|
||||
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
|
||||
assert remoteClusterIndices.size() == 1;
|
||||
|
@ -206,81 +227,168 @@ public class FollowIndexAction extends Action<FollowIndexAction.Request, FollowI
|
|||
* </ul>
|
||||
*/
|
||||
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
|
||||
ActionListener<Response> handler) {
|
||||
validate (leaderIndexMetadata ,followIndexMetadata , request);
|
||||
final int numShards = followIndexMetadata.getNumberOfShards();
|
||||
final AtomicInteger counter = new AtomicInteger(numShards);
|
||||
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
|
||||
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
|
||||
final int shardId = i;
|
||||
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
|
||||
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
|
||||
new ShardId(followIndexMetadata.getIndex(), shardId),
|
||||
new ShardId(leaderIndexMetadata.getIndex(), shardId),
|
||||
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
|
||||
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
|
||||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
|
||||
@Override
|
||||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
|
||||
responses.set(shardId, task);
|
||||
finalizeResponse();
|
||||
}
|
||||
|
||||
ActionListener<Response> handler) throws IOException {
|
||||
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
|
||||
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService);
|
||||
final int numShards = followIndexMetadata.getNumberOfShards();
|
||||
final AtomicInteger counter = new AtomicInteger(numShards);
|
||||
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
|
||||
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
|
||||
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
|
||||
final int shardId = i;
|
||||
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
|
||||
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
|
||||
new ShardId(followIndexMetadata.getIndex(), shardId),
|
||||
new ShardId(leaderIndexMetadata.getIndex(), shardId),
|
||||
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
|
||||
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
|
||||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
responses.set(shardId, e);
|
||||
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
|
||||
responses.set(shardId, task);
|
||||
finalizeResponse();
|
||||
}
|
||||
|
||||
void finalizeResponse() {
|
||||
Exception error = null;
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
for (int j = 0; j < responses.length(); j++) {
|
||||
Object response = responses.get(j);
|
||||
if (response instanceof Exception) {
|
||||
if (error == null) {
|
||||
error = (Exception) response;
|
||||
} else {
|
||||
error.addSuppressed((Throwable) response);
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
responses.set(shardId, e);
|
||||
finalizeResponse();
|
||||
}
|
||||
|
||||
void finalizeResponse() {
|
||||
Exception error = null;
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
for (int j = 0; j < responses.length(); j++) {
|
||||
Object response = responses.get(j);
|
||||
if (response instanceof Exception) {
|
||||
if (error == null) {
|
||||
error = (Exception) response;
|
||||
} else {
|
||||
error.addSuppressed((Throwable) response);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (error == null) {
|
||||
// include task ids?
|
||||
handler.onResponse(new Response(true));
|
||||
} else {
|
||||
// TODO: cancel all started tasks
|
||||
handler.onFailure(error);
|
||||
}
|
||||
if (error == null) {
|
||||
// include task ids?
|
||||
handler.onResponse(new Response(true));
|
||||
} else {
|
||||
// TODO: cancel all started tasks
|
||||
handler.onFailure(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final Set<Setting<?>> WHITELISTED_SETTINGS;
|
||||
|
||||
static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
|
||||
static {
|
||||
Set<Setting<?>> whiteListedSettings = new HashSet<>();
|
||||
whiteListedSettings.add(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING);
|
||||
whiteListedSettings.add(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING);
|
||||
|
||||
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING);
|
||||
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING);
|
||||
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING);
|
||||
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING);
|
||||
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING);
|
||||
whiteListedSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING);
|
||||
|
||||
whiteListedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.MAX_RESULT_WINDOW_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.INDEX_WARMER_ENABLED_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.MAX_RESCORE_WINDOW_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.DEFAULT_FIELD_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.QUERY_STRING_LENIENT_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.QUERY_STRING_ANALYZE_WILDCARD);
|
||||
whiteListedSettings.add(IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD);
|
||||
whiteListedSettings.add(IndexSettings.ALLOW_UNMAPPED);
|
||||
whiteListedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER);
|
||||
whiteListedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
|
||||
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING);
|
||||
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING);
|
||||
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING);
|
||||
|
||||
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING);
|
||||
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
|
||||
|
||||
WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);
|
||||
}
|
||||
|
||||
static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, MapperService followerMapperService) {
|
||||
if (leaderIndex == null) {
|
||||
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
|
||||
}
|
||||
|
||||
if (followIndex == null) {
|
||||
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
|
||||
}
|
||||
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
|
||||
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
|
||||
}
|
||||
|
||||
if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
|
||||
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
|
||||
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
|
||||
}
|
||||
// TODO: other validation checks
|
||||
if (leaderIndex.getRoutingNumShards() != followIndex.getRoutingNumShards()) {
|
||||
throw new IllegalArgumentException("leader index number_of_routing_shards [" + leaderIndex.getRoutingNumShards() +
|
||||
"] does not match with the number_of_routing_shards of the follow index [" + followIndex.getRoutingNumShards() + "]");
|
||||
}
|
||||
if (leaderIndex.getState() != IndexMetaData.State.OPEN || followIndex.getState() != IndexMetaData.State.OPEN) {
|
||||
throw new IllegalArgumentException("leader and follow index must be open");
|
||||
}
|
||||
|
||||
// Make a copy, remove settings that are allowed to be different and then compare if the settings are equal.
|
||||
Settings leaderSettings = filter(leaderIndex.getSettings());
|
||||
Settings followerSettings = filter(followIndex.getSettings());
|
||||
if (leaderSettings.equals(followerSettings) == false) {
|
||||
throw new IllegalArgumentException("the leader and follower index settings must be identical");
|
||||
}
|
||||
|
||||
// Validates if the current follower mapping is mergable with the leader mapping.
|
||||
// This also validates for example whether specific mapper plugins have been installed
|
||||
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
|
||||
}
|
||||
|
||||
private static Settings filter(Settings originalSettings) {
|
||||
Settings.Builder settings = Settings.builder().put(originalSettings);
|
||||
// Remove settings that are always going to be different between leader and follow index:
|
||||
settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());
|
||||
settings.remove(IndexMetaData.SETTING_INDEX_UUID);
|
||||
settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME);
|
||||
settings.remove(IndexMetaData.SETTING_CREATION_DATE);
|
||||
|
||||
Iterator<String> iterator = settings.keys().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String key = iterator.next();
|
||||
for (Setting<?> whitelistedSetting : WHITELISTED_SETTINGS) {
|
||||
if (whitelistedSetting.match(key)) {
|
||||
iterator.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return settings.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -7,60 +7,147 @@ package org.elasticsearch.xpack.ccr.action;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData.State;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.MapperTestUtils;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class FollowIndexActionTests extends ESTestCase {
|
||||
|
||||
public void testValidation() {
|
||||
public void testValidation() throws IOException {
|
||||
FollowIndexAction.Request request = new FollowIndexAction.Request();
|
||||
request.setLeaderIndex("index1");
|
||||
request.setFollowIndex("index2");
|
||||
|
||||
{
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(null, null, request));
|
||||
// should fail, because leader index does not exist
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null));
|
||||
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
|
||||
}
|
||||
{
|
||||
// should fail, because follow index does not exist
|
||||
IndexMetaData leaderIMD = createIMD("index1", 5);
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(leaderIMD, null, request));
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, leaderIMD, null, null));
|
||||
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
|
||||
}
|
||||
{
|
||||
// should fail because leader index does not have soft deletes enabled
|
||||
IndexMetaData leaderIMD = createIMD("index1", 5);
|
||||
IndexMetaData followIMD = createIMD("index2", 5);
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> FollowIndexAction.validate(leaderIMD, followIMD, request));
|
||||
() -> FollowIndexAction.validate(request, leaderIMD, followIMD, null));
|
||||
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
|
||||
}
|
||||
{
|
||||
// should fail because the number of primary shards between leader and follow index are not equal
|
||||
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
IndexMetaData followIMD = createIMD("index2", 4);
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> FollowIndexAction.validate(leaderIMD, followIMD, request));
|
||||
() -> FollowIndexAction.validate(request, leaderIMD, followIMD, null));
|
||||
assertThat(e.getMessage(),
|
||||
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
|
||||
}
|
||||
{
|
||||
// should fail, because leader index is closed
|
||||
IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5,
|
||||
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5,
|
||||
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> FollowIndexAction.validate(request, leaderIMD, followIMD, null));
|
||||
assertThat(e.getMessage(), equalTo("leader and follow index must be open"));
|
||||
}
|
||||
{
|
||||
// should fail, because leader has a field with the same name mapped as keyword and follower as text
|
||||
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5,
|
||||
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5);
|
||||
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
|
||||
mapperService.updateMapping(followIMD);
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService));
|
||||
assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]"));
|
||||
}
|
||||
{
|
||||
// should fail because of non whitelisted settings not the same between leader and follow index
|
||||
String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}";
|
||||
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5,
|
||||
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace"));
|
||||
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5,
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard"));
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
() -> FollowIndexAction.validate(request, leaderIMD, followIMD, null));
|
||||
assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical"));
|
||||
}
|
||||
{
|
||||
// should succeed
|
||||
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
IndexMetaData followIMD = createIMD("index2", 5);
|
||||
FollowIndexAction.validate(leaderIMD, followIMD, request);
|
||||
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
|
||||
mapperService.updateMapping(followIMD);
|
||||
FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService);
|
||||
}
|
||||
{
|
||||
// should succeed, index settings are identical
|
||||
String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}";
|
||||
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5,
|
||||
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard"));
|
||||
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5,
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard"));
|
||||
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
|
||||
followIMD.getSettings(), "index2");
|
||||
mapperService.updateMapping(followIMD);
|
||||
FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService);
|
||||
}
|
||||
{
|
||||
// should succeed despite whitelisted settings being different
|
||||
String mapping = "{\"properties\": {\"field\": {\"type\": \"text\", \"analyzer\": \"my_analyzer\"}}}";
|
||||
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5,
|
||||
new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"),
|
||||
new Tuple<>(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard"));
|
||||
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5,
|
||||
new Tuple<>(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.type", "custom"),
|
||||
new Tuple<>("index.analysis.analyzer.my_analyzer.tokenizer", "standard"));
|
||||
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
|
||||
followIMD.getSettings(), "index2");
|
||||
mapperService.updateMapping(followIMD);
|
||||
FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService);
|
||||
}
|
||||
}
|
||||
|
||||
private static IndexMetaData createIMD(String index, int numShards, Tuple<?, ?>... settings) throws IOException {
|
||||
return createIMD(index, State.OPEN, "{\"properties\": {}}", numShards, settings);
|
||||
}
|
||||
|
||||
private static IndexMetaData createIMD(String index, int numShards, Tuple<?, ?>... settings) {
|
||||
private static IndexMetaData createIMD(String index, State state, String mapping, int numShards,
|
||||
Tuple<?, ?>... settings) throws IOException {
|
||||
Settings.Builder settingsBuilder = settings(Version.CURRENT);
|
||||
for (Tuple<?, ?> setting : settings) {
|
||||
settingsBuilder.put((String) setting.v1(), (String) setting.v2());
|
||||
}
|
||||
return IndexMetaData.builder(index).settings(settingsBuilder)
|
||||
.numberOfShards(numShards)
|
||||
.state(state)
|
||||
.numberOfReplicas(0)
|
||||
.setRoutingNumShards(numShards).build();
|
||||
.setRoutingNumShards(numShards)
|
||||
.putMapping("_doc", mapping)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue