CCR: Requires soft-deletes on the follower (#34725)

Since #34412 and #34474, a follower must have soft-deletes enabled 
to work correctly. This change requires soft-deletes on the follower.

Relates #34412
Relates #34474
This commit is contained in:
Nhat Nguyen 2018-10-23 11:51:17 -04:00 committed by GitHub
parent cadd6731e7
commit 5923ea536e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 9 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -168,6 +169,7 @@ public final class TransportPutFollowAction
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex);
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
imdBuilder.settings(settingsBuilder);
// Copy mappings from leader IMD to follow IMD

View File

@ -240,6 +240,9 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + leaderIndexName + "] does not have soft deletes enabled");
}
if (followIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("follower index [" + request.getFollowerIndex() + "] does not have soft deletes enabled");
}
if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
@ -382,7 +385,6 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING);
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING);
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING);
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
WHITE_LISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);

View File

@ -49,6 +49,9 @@ public final class FollowingEngine extends InternalEngine {
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(engineConfig.getIndexSettings().getSettings()) == false) {
throw new IllegalArgumentException("a following engine can not be constructed for a non-following index");
}
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) {
throw new IllegalArgumentException("a following engine requires soft deletes to be enabled");
}
return engineConfig;
}

View File

@ -85,11 +85,20 @@ public class TransportResumeFollowActionTests extends ESTestCase {
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not have soft deletes enabled"));
}
{
// should fail because the follower index does not have soft deletes enabled
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("follower index [index2] does not have soft deletes enabled"));
}
{
// should fail because the number of primary shards between leader and follow index are not equal
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY, customMetaData);
IndexMetaData followIMD = createIMD("index2", 4,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(),
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
@ -98,8 +107,8 @@ public class TransportResumeFollowActionTests extends ESTestCase {
// should fail, because leader index is closed
IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), customMetaData);
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader and follow index must be open"));
}
@ -107,7 +116,8 @@ public class TransportResumeFollowActionTests extends ESTestCase {
// should fail, because index.xpack.ccr.following_index setting has not been enabled in leader index
IndexMetaData leaderIMD = createIMD("index1", 1,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
IndexMetaData followIMD = createIMD("index2", 1, Settings.EMPTY, customMetaData);
IndexMetaData followIMD = createIMD("index2", 1,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), customMetaData);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
mapperService.updateMapping(null, followIMD);
Exception e = expectThrows(IllegalArgumentException.class,
@ -120,7 +130,8 @@ public class TransportResumeFollowActionTests extends ESTestCase {
IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5,
Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), customMetaData);
Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(), customMetaData);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
mapperService.updateMapping(null, followIMD);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService));
@ -135,6 +146,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
.put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build(), null);
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
@ -144,8 +156,8 @@ public class TransportResumeFollowActionTests extends ESTestCase {
// should fail because the following index does not have the following_index settings
IndexMetaData leaderIMD = createIMD("index1", 5,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
Settings followingIndexSettings = randomBoolean() ? Settings.EMPTY :
Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build();
Settings followingIndexSettings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build();
IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings, customMetaData);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
followingIndexSettings, "index2");
@ -160,6 +172,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), null);
IndexMetaData followIMD = createIMD("index2", 5, Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), customMetaData);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2");
mapperService.updateMapping(null, followIMD);
@ -174,6 +187,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), null);
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetaData);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
@ -191,6 +205,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), null);
IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s")
.put("index.analysis.analyzer.my_analyzer.type", "custom")
.put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetaData);

View File

@ -41,7 +41,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
// test that we use the primary term on the follower when applying operations from the leader
public void testPrimaryTermFromFollower() throws IOException {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexShard followerPrimary = newStartedShard(true, settings, new FollowingEngineFactory());
// we use this primary on the operations yet we expect the applied operations to have the primary term of the follower

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -31,6 +32,7 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
public void testDoNotFillGaps() throws Exception {
Settings settings = Settings.builder()
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexShard indexShard = newStartedShard(false, settings, new FollowingEngineFactory());

View File

@ -127,6 +127,7 @@ public class FollowingEngineTests extends ESTestCase {
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.put("index.soft_deletes.enabled", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
@ -152,6 +153,7 @@ public class FollowingEngineTests extends ESTestCase {
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.put("index.soft_deletes.enabled", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
@ -186,6 +188,7 @@ public class FollowingEngineTests extends ESTestCase {
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.put("index.soft_deletes.enabled", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
@ -216,6 +219,7 @@ public class FollowingEngineTests extends ESTestCase {
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.put("index.soft_deletes.enabled", true)
.build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);