Changed CCR internal integration tests to use a leader and follower cluster instead of a single cluster (#34344)

The `AutoFollowTests` needs to restart the clusters between each tests, because
it is using auto follow stats in assertions. Auto follow stats are only reset
by stopping the elected master node.

Extracted the `testGetOperationsBasedOnGlobalSequenceId()` test to its own test, because it just tests the shard changes api.

* Renamed AutoFollowTests to AutoFollowIT, because it is an integration test.
Renamed ShardChangesIT to IndexFollowingIT, because shard changes it the name
of an internal api and isn't a good name for an integration test.

* move creation of NodeConfigurationSource to a seperate method

* Fixes issues after merge, moved assertSeqNos() and assertSameDocIdsOnShards() methods from ESIntegTestCase to InternalTestCluster, so that ccr tests can use these methods too.
This commit is contained in:
Martijn van Groningen 2018-10-16 14:45:46 +02:00 committed by GitHub
parent 67e7464601
commit a1ec91395c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 712 additions and 427 deletions

View File

@ -111,8 +111,8 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
if (disableBeforeIndexDeletion == false) {
super.beforeIndexDeletion();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
assertSeqNos();
assertSameDocIdsOnShards();
internalCluster().assertSeqNos();
internalCluster().assertSameDocIdsOnShards();
}
}

View File

@ -102,8 +102,8 @@ public class RelocationIT extends ESIntegTestCase {
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertSeqNos();
assertSameDocIdsOnShards();
internalCluster().assertSeqNos();
internalCluster().assertSameDocIdsOnShards();
}
public void testSimpleRelocationNoIndexing() {

View File

@ -19,16 +19,12 @@
package org.elasticsearch.test;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@ -78,7 +74,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -125,15 +120,10 @@ import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MockFieldFilterPlugin;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
@ -197,7 +187,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -221,7 +210,6 @@ import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
@ -2359,108 +2347,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
return new Index(index, uuid);
}
protected void assertSeqNos() throws Exception {
final BiFunction<ClusterState, ShardRouting, IndexShard> getInstanceShardInstance = (clusterState, shardRouting) -> {
if (shardRouting.assignedToNode() == false) {
return null;
}
final DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
if (assignedNode == null) {
return null;
}
return internalCluster().getInstance(IndicesService.class, assignedNode.getName()).getShardOrNull(shardRouting.shardId());
};
assertBusy(() -> {
final ClusterState state = clusterService().state();
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
if (primaryShardRouting == null) {
continue;
}
final IndexShard primaryShard = getInstanceShardInstance.apply(state, primaryShardRouting);
if (primaryShard == null) {
continue; //just ignore - shard movement
}
final SeqNoStats primarySeqNoStats;
final ObjectLongMap<String> syncGlobalCheckpoints;
try {
primarySeqNoStats = primaryShard.seqNoStats();
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
} catch (AlreadyClosedException ex) {
continue; // shard is closed - just ignore
}
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
final IndexShard replicaShard = getInstanceShardInstance.apply(state, replicaShardRouting);
if (replicaShard == null) {
continue; //just ignore - shard movement
}
final SeqNoStats seqNoStats;
try {
seqNoStats = replicaShard.seqNoStats();
} catch (AlreadyClosedException e) {
continue; // shard is closed - just ignore
}
assertThat(replicaShardRouting + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(replicaShardRouting + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(replicaShardRouting + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
}
}
}
});
}
/**
* Asserts that all shards with the same shardId should have document Ids.
*/
public void assertSameDocIdsOnShards() throws Exception {
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
final List<DocIdSeqNoAndTerm> docsOnPrimary;
try {
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
} catch (AlreadyClosedException ex) {
continue;
}
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
if (replicaShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
final List<DocIdSeqNoAndTerm> docsOnReplica;
try {
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
} catch (AlreadyClosedException ex) {
continue;
}
assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size()
+ "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]",
docsOnReplica, equalTo(docsOnPrimary));
}
}
}
});
}
public static boolean inFipsJvm() {
return Security.getProviders()[0].getName().toLowerCase(Locale.ROOT).contains("fips");
}

View File

@ -18,6 +18,9 @@
*/
package org.elasticsearch.test;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.carrotsearch.randomizedtesting.SysGlobals;
@ -40,6 +43,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
@ -76,8 +81,11 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
@ -127,6 +135,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -148,6 +157,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@ -1273,6 +1283,108 @@ public final class InternalTestCluster extends TestCluster {
}
}
public void assertSeqNos() throws Exception {
final BiFunction<ClusterState, ShardRouting, IndexShard> getInstanceShardInstance = (clusterState, shardRouting) -> {
if (shardRouting.assignedToNode() == false) {
return null;
}
final DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
if (assignedNode == null) {
return null;
}
return getInstance(IndicesService.class, assignedNode.getName()).getShardOrNull(shardRouting.shardId());
};
assertBusy(() -> {
final ClusterState state = clusterService().state();
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
if (primaryShardRouting == null) {
continue;
}
final IndexShard primaryShard = getInstanceShardInstance.apply(state, primaryShardRouting);
if (primaryShard == null) {
continue; //just ignore - shard movement
}
final SeqNoStats primarySeqNoStats;
final ObjectLongMap<String> syncGlobalCheckpoints;
try {
primarySeqNoStats = primaryShard.seqNoStats();
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
} catch (AlreadyClosedException ex) {
continue; // shard is closed - just ignore
}
assertThat(primaryShardRouting + " should have set the global checkpoint",
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
final IndexShard replicaShard = getInstanceShardInstance.apply(state, replicaShardRouting);
if (replicaShard == null) {
continue; //just ignore - shard movement
}
final SeqNoStats seqNoStats;
try {
seqNoStats = replicaShard.seqNoStats();
} catch (AlreadyClosedException e) {
continue; // shard is closed - just ignore
}
assertThat(replicaShardRouting + " local checkpoint mismatch",
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
assertThat(replicaShardRouting + " global checkpoint mismatch",
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
assertThat(replicaShardRouting + " max seq no mismatch",
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
}
}
}
});
}
/**
* Asserts that all shards with the same shardId should have document Ids.
*/
public void assertSameDocIdsOnShards() throws Exception {
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().get().getState();
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
IndexShard primaryShard = getInstance(IndicesService.class, primaryNode.getName())
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
final List<DocIdSeqNoAndTerm> docsOnPrimary;
try {
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
} catch (AlreadyClosedException ex) {
continue;
}
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
if (replicaShardRouting.assignedToNode() == false) {
continue;
}
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
IndexShard replicaShard = getInstance(IndicesService.class, replicaNode.getName())
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
final List<DocIdSeqNoAndTerm> docsOnReplica;
try {
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
} catch (AlreadyClosedException ex) {
continue;
}
assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size()
+ "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]",
docsOnReplica, equalTo(docsOnPrimary));
}
}
}
});
}
private void randomlyResetClients() throws IOException {
// only reset the clients on nightly tests, it causes heavy load...
if (RandomizedTest.isNightly() && rarely(random)) {

View File

@ -0,0 +1,299 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public abstract class CCRIntegTestCase extends ESTestCase {
private static ClusterGroup clusterGroup;
@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
return;
}
stopClusters();
NodeConfigurationSource nodeConfigurationSource = createNodeConfigurationSource();
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin());
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "leader", mockPlugins,
Function.identity());
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "follower", mockPlugins,
Function.identity());
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);
leaderCluster.beforeTest(random(), 0.0D);
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
followerCluster.beforeTest(random(), 0.0D);
followerCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
}
@After
public void afterTest() throws Exception {
String masterNode = clusterGroup.followerCluster.getMasterName();
ClusterService clusterService = clusterGroup.followerCluster.getInstance(ClusterService.class, masterNode);
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metaData(MetaData.builder(currentState.getMetaData())
.removeCustom(AutoFollowMetadata.TYPE)
.build());
return newState.build();
}
@Override
public void onFailure(String source, Exception e) {
latch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
});
latch.await();
clusterGroup.leaderCluster.beforeIndexDeletion();
clusterGroup.leaderCluster.assertSeqNos();
clusterGroup.leaderCluster.assertSameDocIdsOnShards();
clusterGroup.leaderCluster.assertConsistentHistoryBetweenTranslogAndLuceneIndex();
clusterGroup.leaderCluster.wipe(Collections.emptySet());
clusterGroup.followerCluster.beforeIndexDeletion();
clusterGroup.followerCluster.assertSeqNos();
clusterGroup.followerCluster.assertSameDocIdsOnShards();
clusterGroup.followerCluster.assertConsistentHistoryBetweenTranslogAndLuceneIndex();
clusterGroup.followerCluster.wipe(Collections.emptySet());
}
private NodeConfigurationSource createNodeConfigurationSource() {
Settings.Builder builder = Settings.builder();
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
// Default the watermarks to absurdly low to prevent the tests
// from failing on nodes without enough disk space
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b");
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b");
builder.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "2048/1m");
// wait short time for other active shards before actually deleting, default 30s not needed in tests
builder.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT.getKey(), new TimeValue(1, TimeUnit.SECONDS));
builder.putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()); // empty list disables a port scan for other nodes
builder.putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file");
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
builder.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
builder.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
builder.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
return new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return builder.build();
}
@Override
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class);
}
@Override
public Settings transportClientSettings() {
return super.transportClientSettings();
}
@Override
public Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(LocalStateCcr.class, getTestTransportPlugin());
}
};
}
@AfterClass
public static void stopClusters() throws IOException {
IOUtils.close(clusterGroup);
clusterGroup = null;
}
protected int numberOfNodesPerCluster() {
return 2;
}
protected boolean reuseClusters() {
return true;
}
protected final Client leaderClient() {
return clusterGroup.leaderCluster.client();
}
protected final Client followerClient() {
return clusterGroup.followerCluster.client();
}
protected final InternalTestCluster getLeaderCluster() {
return clusterGroup.leaderCluster;
}
protected final InternalTestCluster getFollowerCluster() {
return clusterGroup.followerCluster;
}
protected final ClusterHealthStatus ensureLeaderYellow(String... indices) {
return ensureColor(clusterGroup.leaderCluster, ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30), false, indices);
}
protected final ClusterHealthStatus ensureLeaderGreen(String... indices) {
return ensureColor(clusterGroup.leaderCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices);
}
protected final ClusterHealthStatus ensureFollowerGreen(String... indices) {
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices);
}
private ClusterHealthStatus ensureColor(TestCluster testCluster,
ClusterHealthStatus clusterHealthStatus,
TimeValue timeout,
boolean waitForNoInitializingShards,
String... indices) {
String color = clusterHealthStatus.name().toLowerCase(Locale.ROOT);
String method = "ensure" + Strings.capitalize(color);
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(indices)
.timeout(timeout)
.waitForStatus(clusterHealthStatus)
.waitForEvents(Priority.LANGUID)
.waitForNoRelocatingShards(true)
.waitForNoInitializingShards(waitForNoInitializingShards)
.waitForNodes(Integer.toString(testCluster.size()));
ClusterHealthResponse actionGet = testCluster.client().admin().cluster().health(healthRequest).actionGet();
if (actionGet.isTimedOut()) {
logger.info("{} timed out, cluster state:\n{}\n{}",
method,
testCluster.client().admin().cluster().prepareState().get().getState(),
testCluster.client().admin().cluster().preparePendingClusterTasks().get());
fail("timed out waiting for " + color + " state");
}
assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),
actionGet.getStatus().value(), lessThanOrEqualTo(clusterHealthStatus.value()));
logger.debug("indices {} are {}", indices.length == 0 ? "[_all]" : indices, color);
return actionGet.getStatus();
}
protected final Index resolveLeaderIndex(String index) {
GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get();
assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));
String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID);
return new Index(index, uuid);
}
protected final Index resolveFollowerIndex(String index) {
GetIndexResponse getIndexResponse = followerClient().admin().indices().prepareGetIndex().setIndices(index).get();
assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));
String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID);
return new Index(index, uuid);
}
protected final RefreshResponse refresh(Client client, String... indices) {
RefreshResponse actionGet = client.admin().indices().prepareRefresh(indices).execute().actionGet();
assertNoFailures(actionGet);
return actionGet;
}
static class ClusterGroup implements Closeable {
final InternalTestCluster leaderCluster;
final InternalTestCluster followerCluster;
ClusterGroup(InternalTestCluster leaderCluster, InternalTestCluster followerCluster) {
this.leaderCluster = leaderCluster;
this.followerCluster = followerCluster;
}
@Override
public void close() throws IOException {
IOUtils.close(leaderCluster, followerCluster);
}
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -42,33 +42,26 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.CCRIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction.StatsRequest;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction.StatsResponses;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.io.IOException;
@ -97,126 +90,28 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
public class ShardChangesIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings(nodeOrdinal));
newSettings.put(XPackSettings.SECURITY_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MONITORING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.WATCHER_ENABLED.getKey(), false);
newSettings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
newSettings.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
return newSettings.build();
}
@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
return Arrays.asList(TestSeedPlugin.class, TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class);
}
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertSeqNos();
assertSameDocIdsOnShards();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
// this emulates what the CCR persistent task will do for pulling
public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
client().admin().indices().prepareCreate("index")
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.get();
client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(2L));
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
request.setFromSeqNo(0L);
request.setMaxOperationCount(3);
ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().length, equalTo(3));
Translog.Index operation = (Translog.Index) response.getOperations()[0];
assertThat(operation.seqNo(), equalTo(0L));
assertThat(operation.id(), equalTo("1"));
operation = (Translog.Index) response.getOperations()[1];
assertThat(operation.seqNo(), equalTo(1L));
assertThat(operation.id(), equalTo("2"));
operation = (Translog.Index) response.getOperations()[2];
assertThat(operation.seqNo(), equalTo(2L));
assertThat(operation.id(), equalTo("3"));
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get();
shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(5L));
request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
request.setFromSeqNo(3L);
request.setMaxOperationCount(3);
response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().length, equalTo(3));
operation = (Translog.Index) response.getOperations()[0];
assertThat(operation.seqNo(), equalTo(3L));
assertThat(operation.id(), equalTo("3"));
operation = (Translog.Index) response.getOperations()[1];
assertThat(operation.seqNo(), equalTo(4L));
assertThat(operation.id(), equalTo("4"));
operation = (Translog.Index) response.getOperations()[2];
assertThat(operation.seqNo(), equalTo(5L));
assertThat(operation.id(), equalTo("5"));
}
public class IndexFollowingIT extends CCRIntegTestCase {
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final int firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
@ -229,18 +124,19 @@ public class ShardChangesIT extends ESIntegTestCase {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
unfollowIndex("index2");
client().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
followerClient().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final Map<ShardId, Long> secondBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] secondBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
final ShardStats[] secondBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : secondBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
final long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
@ -253,28 +149,29 @@ public class ShardChangesIT extends ESIntegTestCase {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards,
firstBatchNumDocs + secondBatchNumDocs);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}
public void testSyncMappings() throws Exception {
final String leaderIndexSettings = getIndexSettings(2, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
for (long i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
}
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs)));
MappingMetaData mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings()
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs)));
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer"));
assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue());
@ -282,36 +179,36 @@ public class ShardChangesIT extends ESIntegTestCase {
final int secondBatchNumDocs = randomIntBetween(2, 64);
for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"k\":%d}", i);
client().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
}
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits,
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits,
equalTo(firstBatchNumDocs + secondBatchNumDocs)));
mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings()
mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer"));
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 2);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 2);
}
public void testNoMappingDefined() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
ensureGreen("index1");
ensureLeaderGreen("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{\"f\":1}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
leaderClient().prepareIndex("index1", "doc", "1").setSource("{\"f\":1}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
unfollowIndex("index2");
MappingMetaData mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings()
MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("long"));
assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue());
@ -321,7 +218,7 @@ public class ShardChangesIT extends ESIntegTestCase {
int numberOfShards = between(1, 5);
String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@ -332,7 +229,7 @@ public class ShardChangesIT extends ESIntegTestCase {
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};
BulkProcessor bulkProcessor = BulkProcessor.builder(client(), listener)
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient(), listener)
.setBulkActions(100)
.setConcurrentRequests(4)
.build();
@ -352,33 +249,33 @@ public class ShardChangesIT extends ESIntegTestCase {
// Waiting for some document being index before following the index:
int maxReadSize = randomIntBetween(128, 2048);
long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10));
atLeastDocsIndexed("index1", numDocsIndexed / 3);
atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed / 3);
PutFollowAction.Request followRequest = follow("index1", "index2");
followRequest.getFollowRequest().setMaxBatchOperationCount(maxReadSize);
followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxWriteBufferSize(randomIntBetween(1024, 10240));
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
atLeastDocsIndexed("index1", numDocsIndexed);
atLeastDocsIndexed(leaderClient(), "index1", numDocsIndexed);
run.set(false);
thread.join();
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));
assertSameDocCount("index1", "index2");
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
client().prepareSearch("index1").get().getHits().totalHits);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfShards,
leaderClient().prepareSearch("index1").get().getHits().totalHits);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412")
public void testFollowIndexAndCloseNode() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
getFollowerCluster().ensureAtLeastNumDataNodes(3);
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
AtomicBoolean run = new AtomicBoolean(true);
Thread thread = new Thread(() -> {
@ -386,7 +283,7 @@ public class ShardChangesIT extends ESIntegTestCase {
while (run.get()) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++);
try {
client().prepareIndex("index1", "doc")
leaderClient().prepareIndex("index1", "doc")
.setSource(source, XContentType.JSON)
.setTimeout(TimeValue.timeValueSeconds(1))
.get();
@ -401,33 +298,32 @@ public class ShardChangesIT extends ESIntegTestCase {
followRequest.getFollowRequest().setMaxBatchOperationCount(randomIntBetween(32, 2048));
followRequest.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(2, 10));
followRequest.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(2, 10));
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getFollowRequest().getMaxBatchOperationCount(),
followRequest.getFollowRequest().getMaxBatchOperationCount() * 10));
long minNumDocsReplicated = maxNumDocsReplicated / 3L;
logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated);
atLeastDocsIndexed("index2", minNumDocsReplicated);
internalCluster().stopRandomNonMasterNode();
atLeastDocsIndexed(followerClient(), "index2", minNumDocsReplicated);
getFollowerCluster().stopRandomNonMasterNode();
logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated);
atLeastDocsIndexed("index2", maxNumDocsReplicated);
atLeastDocsIndexed(followerClient(), "index2", maxNumDocsReplicated);
run.set(false);
thread.join();
assertSameDocCount("index1", "index2");
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 3);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 3);
}
public void testFollowIndexWithNestedField() throws Exception {
final String leaderIndexSettings =
getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
internalCluster().ensureAtLeastNumDataNodes(2);
ensureGreen("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final int numDocs = randomIntBetween(2, 64);
for (int i = 0; i < numDocs; i++) {
@ -442,14 +338,14 @@ public class ShardChangesIT extends ESIntegTestCase {
}
builder.endArray();
builder.endObject();
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(builder).get();
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(builder).get();
}
}
for (int i = 0; i < numDocs; i++) {
int value = i;
assertBusy(() -> {
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
assertTrue(getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("field")));
assertThat(XContentMapValues.extractValue("objects.field", getResponse.getSource()),
@ -457,60 +353,63 @@ public class ShardChangesIT extends ESIntegTestCase {
});
}
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}
public void testUnfollowNonExistingIndex() {
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
unfollowRequest.setFollowIndex("non-existing-index");
expectThrows(IllegalArgumentException.class, () -> client().execute(PauseFollowAction.INSTANCE, unfollowRequest).actionGet());
expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).actionGet());
}
public void testFollowNonExistentIndex() throws Exception {
String indexSettings = getIndexSettings(1, 0, Collections.emptyMap());
assertAcked(client().admin().indices().prepareCreate("test-leader").setSource(indexSettings, XContentType.JSON).get());
assertAcked(client().admin().indices().prepareCreate("test-follower").setSource(indexSettings, XContentType.JSON).get());
ensureGreen("test-leader", "test-follower");
assertAcked(leaderClient().admin().indices().prepareCreate("test-leader").setSource(indexSettings, XContentType.JSON).get());
assertAcked(followerClient().admin().indices().prepareCreate("test-follower").setSource(indexSettings, XContentType.JSON).get());
ensureLeaderGreen("test-leader");
ensureFollowerGreen("test-follower");
// Leader index does not exist.
ResumeFollowAction.Request followRequest1 = resumeFollow("non-existent-leader", "test-follower");
expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet());
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest1).actionGet());
expectThrows(IndexNotFoundException.class,
() -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest1))
() -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest1))
.actionGet());
// Follower index does not exist.
ResumeFollowAction.Request followRequest2 = resumeFollow("non-test-leader", "non-existent-follower");
expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest2).actionGet());
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest2).actionGet());
expectThrows(IndexNotFoundException.class,
() -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest2))
() -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest2))
.actionGet());
// Both indices do not exist.
ResumeFollowAction.Request followRequest3 = resumeFollow("non-existent-leader", "non-existent-follower");
expectThrows(IndexNotFoundException.class, () -> client().execute(ResumeFollowAction.INSTANCE, followRequest3).actionGet());
expectThrows(IndexNotFoundException.class, () -> followerClient().execute(ResumeFollowAction.INSTANCE, followRequest3).actionGet());
expectThrows(IndexNotFoundException.class,
() -> client().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest3))
() -> followerClient().execute(PutFollowAction.INSTANCE, new PutFollowAction.Request(followRequest3))
.actionGet());
}
public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final int numDocs = between(10, 1024);
logger.info("Indexing [{}] docs", numDocs);
for (int i = 0; i < numDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
PutFollowAction.Request followRequest = follow("index1", "index2");
followRequest.getFollowRequest().setMaxBatchSize(new ByteSizeValue(1, ByteSizeUnit.BYTES));
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
@ -523,54 +422,55 @@ public class ShardChangesIT extends ESIntegTestCase {
assertBusy(assertExpectedDocumentRunnable(i));
}
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), 1);
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), 1, numDocs);
}
public void testDontFollowTheWrongIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index1");
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index3");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index3");
PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
followRequest = follow("index3", "index4");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
unfollowIndex("index2", "index4");
ResumeFollowAction.Request wrongRequest1 = resumeFollow("index1", "index4");
Exception e = expectThrows(IllegalArgumentException.class,
() -> client().execute(ResumeFollowAction.INSTANCE, wrongRequest1).actionGet());
() -> followerClient().execute(ResumeFollowAction.INSTANCE, wrongRequest1).actionGet());
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
ResumeFollowAction.Request wrongRequest2 = resumeFollow("index3", "index2");
e = expectThrows(IllegalArgumentException.class, () -> client().execute(ResumeFollowAction.INSTANCE, wrongRequest2).actionGet());
e = expectThrows(IllegalArgumentException.class,
() -> followerClient().execute(ResumeFollowAction.INSTANCE, wrongRequest2).actionGet());
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
}
public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
ensureYellow("index1");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow("index1");
PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
unfollowIndex("index2");
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("index2");
updateSettingsRequest.settings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build());
Exception e = expectThrows(IllegalArgumentException.class,
() -> client().admin().indices().updateSettings(updateSettingsRequest).actionGet());
() -> followerClient().admin().indices().updateSettings(updateSettingsRequest).actionGet());
assertThat(e.getMessage(), equalTo("can not update internal setting [index.xpack.ccr.following_index]; " +
"this setting is managed via a dedicated API"));
}
public void testCloseLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -578,14 +478,14 @@ public class ShardChangesIT extends ESIntegTestCase {
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
leaderClient().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
@ -593,18 +493,18 @@ public class ShardChangesIT extends ESIntegTestCase {
assertThat(response.getStatsResponses().get(0).status().fetchExceptions().size(), equalTo(1));
ElasticsearchException exception = response.getStatsResponses().get(0).status()
.fetchExceptions().entrySet().iterator().next().getValue().v2();
assertThat(exception.getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];"));
assertThat(exception.getRootCause().getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];"));
});
client().admin().indices().open(new OpenIndexRequest("index1")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
leaderClient().admin().indices().open(new OpenIndexRequest("index1")).actionGet();
leaderClient().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
unfollowIndex("index2");
}
public void testCloseFollowIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -612,28 +512,28 @@ public class ShardChangesIT extends ESIntegTestCase {
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
leaderClient().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
});
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
followerClient().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
unfollowIndex("index2");
}
public void testDeleteLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -641,28 +541,28 @@ public class ShardChangesIT extends ESIntegTestCase {
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
leaderClient().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
ElasticsearchException fatalException = response.getStatsResponses().get(0).status().getFatalException();
assertThat(fatalException, notNullValue());
assertThat(fatalException.getMessage(), equalTo("no such index"));
assertThat(fatalException.getRootCause().getMessage(), equalTo("no such index"));
});
unfollowIndex("index2");
ensureNoCcrTasks();
}
public void testDeleteFollowerIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
assertAcked(leaderClient().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@ -670,15 +570,15 @@ public class ShardChangesIT extends ESIntegTestCase {
.build()));
final PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
leaderClient().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
followerClient().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
leaderClient().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
@ -693,12 +593,12 @@ public class ShardChangesIT extends ESIntegTestCase {
public void testUnfollowIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
PutFollowAction.Request followRequest = follow("index1", "index2");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L));
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L));
});
// Indexing directly into index2 would fail now, because index2 is a follow index.
@ -706,27 +606,27 @@ public class ShardChangesIT extends ESIntegTestCase {
// Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index:
unfollowIndex("index2");
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
assertAcked(client().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
ensureGreen("index2");
followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
followerClient().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
ensureFollowerGreen("index2");
// Indexing succeeds now, because index2 is no longer a follow index:
client().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON)
followerClient().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412")
public void testFailOverOnFollower() throws Exception {
int numberOfReplicas = between(1, 2);
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(numberOfReplicas + between(1, 2));
getFollowerCluster().startMasterOnlyNode();
getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader-index");
assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("leader-index");
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 8)];
AtomicInteger docID = new AtomicInteger();
@ -736,10 +636,10 @@ public class ShardChangesIT extends ESIntegTestCase {
try {
if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
client().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
leaderClient().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
client().prepareDelete("leader-index", "doc", id).get();
leaderClient().prepareDelete("leader-index", "doc", id).get();
}
} catch (NodeClosedException ignored) {
}
@ -748,19 +648,19 @@ public class ShardChangesIT extends ESIntegTestCase {
threads[i].start();
}
PutFollowAction.Request follow = follow("leader-index", "follower-index");
client().execute(PutFollowAction.INSTANCE, follow).get();
ensureGreen("follower-index");
atLeastDocsIndexed("follower-index", between(20, 60));
final ClusterState clusterState = clusterService().state();
followerClient().execute(PutFollowAction.INSTANCE, follow).get();
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(20, 60));
final ClusterState clusterState = getFollowerCluster().clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
if (shardRouting.primary()) {
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
internalCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
break;
}
}
ensureGreen("follower-index");
atLeastDocsIndexed("follower-index", between(80, 150));
ensureFollowerGreen("follower-index");
atLeastDocsIndexed(followerClient(), "follower-index", between(80, 150));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
@ -771,13 +671,13 @@ public class ShardChangesIT extends ESIntegTestCase {
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData taskMetadata = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
listTasksRequest.setActions(ShardFollowTask.NAME + "[c]");
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).actionGet();
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).actionGet();
assertThat(listTasksResponse.getNodeFailures().size(), equalTo(0));
assertThat(listTasksResponse.getTaskFailures().size(), equalTo(0));
@ -809,20 +709,20 @@ public class ShardChangesIT extends ESIntegTestCase {
for (String index : indices) {
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
unfollowRequest.setFollowIndex(index);
client().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();
}
private void ensureNoCcrTasks() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks(), empty());
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get();
ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
@ -835,7 +735,7 @@ public class ShardChangesIT extends ESIntegTestCase {
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return () -> {
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("f")));
assertThat(getResponse.getSource().get("f"), equalTo(value));
@ -934,27 +834,27 @@ public class ShardChangesIT extends ESIntegTestCase {
return settings;
}
private void atLeastDocsIndexed(String index, long numDocsReplicated) throws InterruptedException {
private void atLeastDocsIndexed(Client client, String index, long numDocsReplicated) throws InterruptedException {
logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index);
awaitBusy(() -> {
refresh(index);
refresh(client, index);
SearchRequest request = new SearchRequest(index);
request.source(new SearchSourceBuilder().size(0));
SearchResponse response = client().search(request).actionGet();
SearchResponse response = client.search(request).actionGet();
return response.getHits().getTotalHits() >= numDocsReplicated;
}, 60, TimeUnit.SECONDS);
}
private void assertSameDocCount(String index1, String index2) throws Exception {
refresh(index1);
SearchRequest request1 = new SearchRequest(index1);
private void assertSameDocCount(String leaderIndex, String followerIndex) throws Exception {
refresh(leaderClient(), leaderIndex);
SearchRequest request1 = new SearchRequest(leaderIndex);
request1.source(new SearchSourceBuilder().size(0));
SearchResponse response1 = client().search(request1).actionGet();
SearchResponse response1 = leaderClient().search(request1).actionGet();
assertBusy(() -> {
refresh(index2);
SearchRequest request2 = new SearchRequest(index2);
refresh(followerClient(), followerIndex);
SearchRequest request2 = new SearchRequest(followerIndex);
request2.source(new SearchSourceBuilder().size(0));
SearchResponse response2 = client().search(request2).actionGet();
SearchResponse response2 = followerClient().search(request2).actionGet();
assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits()));
}, 60, TimeUnit.SECONDS);
}
@ -965,9 +865,9 @@ public class ShardChangesIT extends ESIntegTestCase {
for (int i = 0; i < msuOnLeader.length; i++) {
msuOnLeader[i] = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
Set<String> leaderNodes = internalCluster().nodesInclude(leaderIndex.getName());
Set<String> leaderNodes = getLeaderCluster().nodesInclude(leaderIndex.getName());
for (String leaderNode : leaderNodes) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, leaderNode);
IndicesService indicesService = getLeaderCluster().getInstance(IndicesService.class, leaderNode);
for (int i = 0; i < numberOfShards; i++) {
IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
if (shard != null) {
@ -980,9 +880,9 @@ public class ShardChangesIT extends ESIntegTestCase {
}
}
Set<String> followerNodes = internalCluster().nodesInclude(followerIndex.getName());
Set<String> followerNodes = getFollowerCluster().nodesInclude(followerIndex.getName());
for (String followerNode : followerNodes) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, followerNode);
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, followerNode);
for (int i = 0; i < numberOfShards; i++) {
IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
if (shard != null) {
@ -1001,8 +901,8 @@ public class ShardChangesIT extends ESIntegTestCase {
assertBusy(() -> {
long[] numOfOptimizedOps = new long[numberOfShards];
for (int shardId = 0; shardId < numberOfShards; shardId++) {
for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
for (String node : getFollowerCluster().nodesInclude(followerIndex.getName())) {
IndicesService indicesService = getFollowerCluster().getInstance(IndicesService.class, node);
IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
if (shard != null && shard.routingEntry().primary()) {
try {
@ -1024,7 +924,7 @@ public class ShardChangesIT extends ESIntegTestCase {
public static ResumeFollowAction.Request resumeFollow(String leaderIndex, String followerIndex) {
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderIndex(leaderIndex);
request.setLeaderIndex("leader_cluster:" + leaderIndex);
request.setFollowerIndex(followerIndex);
request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
request.setPollTimeout(TimeValue.timeValueMillis(10));

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -14,31 +15,23 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.CCRIntegTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
public class AutoFollowTests extends ESSingleNodeTestCase {
public class AutoFollowIT extends CCRIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(LocalStateCcr.class);
}
@Override
protected boolean resetNodeAfterTest() {
return true;
protected boolean reuseClusters() {
return false;
}
public void testAutoFollow() throws Exception {
@ -48,31 +41,31 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
createIndex("logs-201812", leaderIndexSettings, "_doc");
createLeaderIndex("logs-201812", leaderIndexSettings);
// Enabling auto following:
putAutoFollowPatterns("logs-*", "transactions-*");
createIndex("metrics-201901", leaderIndexSettings, "_doc");
createLeaderIndex("metrics-201901", leaderIndexSettings);
createIndex("logs-201901", leaderIndexSettings, "_doc");
createLeaderIndex("logs-201901", leaderIndexSettings);
assertBusy(() -> {
IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901");
assertTrue(client().admin().indices().exists(request).actionGet().isExists());
assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists());
});
createIndex("transactions-201901", leaderIndexSettings, "_doc");
createLeaderIndex("transactions-201901", leaderIndexSettings);
assertBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(2L));
IndicesExistsRequest request = new IndicesExistsRequest("copy-transactions-201901");
assertTrue(client().admin().indices().exists(request).actionGet().isExists());
assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists());
});
IndicesExistsRequest request = new IndicesExistsRequest("copy-metrics-201901");
assertFalse(client().admin().indices().exists(request).actionGet().isExists());
assertFalse(followerClient().admin().indices().exists(request).actionGet().isExists());
request = new IndicesExistsRequest("copy-logs-201812");
assertFalse(client().admin().indices().exists(request).actionGet().isExists());
assertFalse(followerClient().admin().indices().exists(request).actionGet().isExists());
}
public void testAutoFollowManyIndices() throws Exception {
@ -85,7 +78,7 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
putAutoFollowPatterns("logs-*");
int numIndices = randomIntBetween(4, 32);
for (int i = 0; i < numIndices; i++) {
createIndex("logs-" + i, leaderIndexSettings, "_doc");
createLeaderIndex("logs-" + i, leaderIndexSettings);
}
int expectedVal1 = numIndices;
assertBusy(() -> {
@ -94,17 +87,17 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
});
deleteAutoFollowPatternSetting();
createIndex("logs-does-not-count", leaderIndexSettings, "_doc");
createLeaderIndex("logs-does-not-count", leaderIndexSettings);
putAutoFollowPatterns("logs-*");
int i = numIndices;
numIndices = numIndices + randomIntBetween(4, 32);
for (; i < numIndices; i++) {
createIndex("logs-" + i, leaderIndexSettings, "_doc");
createLeaderIndex("logs-" + i, leaderIndexSettings);
}
int expectedVal2 = numIndices;
assertBusy(() -> {
MetaData metaData = client().admin().cluster().prepareState().get().getState().metaData();
MetaData metaData = followerClient().admin().cluster().prepareState().get().getState().metaData();
int count = (int) Arrays.stream(metaData.getConcreteAllIndices()).filter(s -> s.startsWith("copy-")).count();
assertThat(count, equalTo(expectedVal2));
});
@ -119,7 +112,7 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
// Enabling auto following:
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setLeaderClusterAlias("_local_");
request.setLeaderClusterAlias("leader_cluster");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
// Need to set this, because following an index in the same cluster
request.setFollowIndexNamePattern("copy-{{leader_index}}");
@ -144,12 +137,12 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
if (randomBoolean()) {
request.setPollTimeout(TimeValue.timeValueMillis(500));
}
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
createIndex("logs-201901", leaderIndexSettings, "_doc");
createLeaderIndex("logs-201901", leaderIndexSettings);
assertBusy(() -> {
PersistentTasksCustomMetaData persistentTasksMetaData =
client().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
followerClient().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(persistentTasksMetaData, notNullValue());
assertThat(persistentTasksMetaData.tasks().size(), equalTo(1));
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams();
@ -181,22 +174,28 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
private void putAutoFollowPatterns(String... patterns) {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setLeaderClusterAlias("_local_");
request.setLeaderClusterAlias("leader_cluster");
request.setLeaderIndexPatterns(Arrays.asList(patterns));
// Need to set this, because following an index in the same cluster
request.setFollowIndexNamePattern("copy-{{leader_index}}");
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
assertTrue(followerClient().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
}
private void deleteAutoFollowPatternSetting() {
DeleteAutoFollowPatternAction.Request request = new DeleteAutoFollowPatternAction.Request();
request.setLeaderClusterAlias("_local_");
assertTrue(client().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
request.setLeaderClusterAlias("leader_cluster");
assertTrue(followerClient().execute(DeleteAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
}
private AutoFollowStats getAutoFollowStats() {
AutoFollowStatsAction.Request request = new AutoFollowStatsAction.Request();
return client().execute(AutoFollowStatsAction.INSTANCE, request).actionGet().getStats();
return followerClient().execute(AutoFollowStatsAction.INSTANCE, request).actionGet().getStats();
}
private void createLeaderIndex(String index, Settings settings) {
CreateIndexRequest request = new CreateIndexRequest(index);
request.settings(settings);
leaderClient().admin().indices().create(request).actionGet();
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import java.util.Collection;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class ShardChangesTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(LocalStateCcr.class);
}
// this emulates what the CCR persistent task will do for pulling
public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
client().admin().indices().prepareCreate("index")
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.get();
client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
ShardStats shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(2L));
String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY);
ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
request.setFromSeqNo(0L);
request.setMaxOperationCount(3);
ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().length, equalTo(3));
Translog.Index operation = (Translog.Index) response.getOperations()[0];
assertThat(operation.seqNo(), equalTo(0L));
assertThat(operation.id(), equalTo("1"));
operation = (Translog.Index) response.getOperations()[1];
assertThat(operation.seqNo(), equalTo(1L));
assertThat(operation.id(), equalTo("2"));
operation = (Translog.Index) response.getOperations()[2];
assertThat(operation.seqNo(), equalTo(2L));
assertThat(operation.id(), equalTo("3"));
client().prepareIndex("index", "doc", "3").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "4").setSource("{}", XContentType.JSON).get();
client().prepareIndex("index", "doc", "5").setSource("{}", XContentType.JSON).get();
shardStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards()[0];
globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint();
assertThat(globalCheckPoint, equalTo(5L));
request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID);
request.setFromSeqNo(3L);
request.setMaxOperationCount(3);
response = client().execute(ShardChangesAction.INSTANCE, request).get();
assertThat(response.getOperations().length, equalTo(3));
operation = (Translog.Index) response.getOperations()[0];
assertThat(operation.seqNo(), equalTo(3L));
assertThat(operation.id(), equalTo("3"));
operation = (Translog.Index) response.getOperations()[1];
assertThat(operation.seqNo(), equalTo(4L));
assertThat(operation.id(), equalTo("4"));
operation = (Translog.Index) response.getOperations()[2];
assertThat(operation.seqNo(), equalTo(5L));
assertThat(operation.id(), equalTo("5"));
}
}

View File

@ -16,7 +16,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.ShardChangesIT;
import org.elasticsearch.xpack.ccr.IndexFollowingIT;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException;
@ -35,12 +35,12 @@ public class TransportResumeFollowActionTests extends ESTestCase {
customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid");
customMetaData.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, "_na_");
ResumeFollowAction.Request request = ShardChangesIT.resumeFollow("index1", "index2");
ResumeFollowAction.Request request = IndexFollowingIT.resumeFollow("index1", "index2");
String[] UUIDs = new String[]{"uuid"};
{
// should fail, because leader index does not exist
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, null, null, null, null));
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not exist"));
}
{
// should fail, because follow index does not exist
@ -83,7 +83,7 @@ public class TransportResumeFollowActionTests extends ESTestCase {
IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, null);
IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData);
Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
assertThat(e.getMessage(), equalTo("leader index [leader_cluster:index1] does not have soft deletes enabled"));
}
{
// should fail because the number of primary shards between leader and follow index are not equal

View File

@ -371,7 +371,9 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new),
// ccr
new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new)
new NamedWriteableRegistry.Entry(MetaData.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, AutoFollowMetadata.TYPE,
in -> AutoFollowMetadata.readDiffFrom(MetaData.Custom.class, AutoFollowMetadata.TYPE, in))
);
}