Retry follow task when remote connection queue full (#55314)

If more than 100 shard-follow tasks are trying to connect to the remote 
cluster, then some of them will abort with "connect listener queue is 
full". This is because we retry on ESRejectedExecutionException, but not
on RejectedExecutionException.
This commit is contained in:
Nhat Nguyen 2020-04-17 00:10:56 -04:00
parent 9e3b813b62
commit 3cc4e0dd09
10 changed files with 72 additions and 46 deletions

View File

@ -98,27 +98,28 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
clusterAlias,
transportService,
connectionManager,
settings,
REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
PROXY_ADDRESS.getConcreteSettingForNamespace(clusterAlias).get(settings),
SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
Settings settings, int maxNumConnections, String configuredAddress) {
this(clusterAlias, transportService, connectionManager, settings, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), null);
}
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress, String configuredServerName) {
this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress,
Settings settings, int maxNumConnections, String configuredAddress, String configuredServerName) {
this(clusterAlias, transportService, connectionManager, settings, maxNumConnections, configuredAddress,
() -> resolveAddress(configuredAddress), configuredServerName);
}
ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
Settings settings, int maxNumConnections, String configuredAddress, Supplier<TransportAddress> address,
String configuredServerName) {
super(clusterAlias, transportService, connectionManager);
super(clusterAlias, transportService, connectionManager, settings);
this.maxNumConnections = maxNumConnections;
this.configuredAddress = configuredAddress;
this.configuredServerName = configuredServerName;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
@ -48,7 +49,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -105,10 +105,14 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
Setting.Property.NodeScope,
Setting.Property.Dynamic));
// this setting is intentionally not registered, it is only used in tests
public static final Setting<Integer> REMOTE_MAX_PENDING_CONNECTION_LISTENERS =
Setting.intSetting("cluster.remote.max_pending_connection_listeners", 1000, Setting.Property.NodeScope);
private final int maxPendingConnectionListeners;
private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class);
private static final int MAX_LISTENERS = 100;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Object mutex = new Object();
private List<ActionListener<Void>> listeners = new ArrayList<>();
@ -117,10 +121,12 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
protected final RemoteConnectionManager connectionManager;
protected final String clusterAlias;
RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager) {
RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
Settings settings) {
this.clusterAlias = clusterAlias;
this.transportService = transportService;
this.connectionManager = connectionManager;
this.maxPendingConnectionListeners = REMOTE_MAX_PENDING_CONNECTION_LISTENERS.get(settings);
connectionManager.addListener(this);
}
@ -237,9 +243,9 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
if (closed) {
assert listeners.isEmpty();
} else {
if (listeners.size() >= MAX_LISTENERS) {
assert listeners.size() == MAX_LISTENERS;
listener.onFailure(new RejectedExecutionException("connect listener queue is full"));
if (listeners.size() >= maxPendingConnectionListeners) {
assert listeners.size() == maxPendingConnectionListeners;
listener.onFailure(new EsRejectedExecutionException("connect listener queue is full"));
return;
} else {
listeners.add(listener);

View File

@ -220,23 +220,24 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
transportService,
connectionManager,
REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(settings),
settings,
REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
getNodePredicate(settings),
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings));
}
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
String proxyAddress, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
List<String> configuredSeedNodes) {
this(clusterAlias, transportService, connectionManager, proxyAddress, maxNumRemoteConnections, nodePredicate, configuredSeedNodes,
configuredSeedNodes.stream().map(seedAddress ->
this(clusterAlias, transportService, connectionManager, proxyAddress, settings, maxNumRemoteConnections, nodePredicate,
configuredSeedNodes, configuredSeedNodes.stream().map(seedAddress ->
(Supplier<DiscoveryNode>) () -> resolveSeedNode(clusterAlias, seedAddress, proxyAddress)).collect(Collectors.toList()));
}
SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
String proxyAddress, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
String proxyAddress, Settings settings, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
List<String> configuredSeedNodes, List<Supplier<DiscoveryNode>> seedNodes) {
super(clusterAlias, transportService, connectionManager);
super(clusterAlias, transportService, connectionManager, settings);
this.proxyAddress = proxyAddress;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;

View File

@ -94,7 +94,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString())) {
Settings.EMPTY, numOfConnections, address1.toString())) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
@ -126,7 +126,8 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
Settings.EMPTY, numOfConnections, address1.toString(),
alternatingResolver(address1, address2, useAddress1), null)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
@ -173,7 +174,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString())) {
Settings.EMPTY, numOfConnections, address1.toString())) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
@ -206,7 +207,8 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address1.toString(), alternatingResolver(address1, address2, useAddress1), null)) {
Settings.EMPTY, numOfConnections, address1.toString(),
alternatingResolver(address1, address2, useAddress1), null)) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2)));
@ -255,7 +257,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address.toString(), addressSupplier, null)) {
Settings.EMPTY, numOfConnections, address.toString(), addressSupplier, null)) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -280,7 +282,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, remoteAddress.toString(), "server-name")) {
Settings.EMPTY, numOfConnections, remoteAddress.toString(), "server-name")) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -373,7 +375,7 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
int numOfConnections = randomIntBetween(4, 8);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
numOfConnections, address, "localhost")) {
Settings.EMPTY, numOfConnections, address, "localhost")) {
assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1)));
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();

View File

@ -91,7 +91,7 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
FakeConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager,
RemoteConnectionStrategy.ConnectionStrategy strategy) {
super(clusterAlias, transportService, connectionManager);
super(clusterAlias, transportService, connectionManager, Settings.EMPTY);
this.strategy = strategy;
}

View File

@ -128,7 +128,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(seedNode))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -165,7 +165,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(seedNode), Collections.singletonList(seedNodeSupplier))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(seedNode), Collections.singletonList(seedNodeSupplier))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -201,7 +201,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 2, n -> true, seedNodes(seedNode))) {
null, Settings.EMPTY, 2, n -> true, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -247,7 +247,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(seedNode))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -276,7 +276,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(incompatibleSeedNode))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(incompatibleSeedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
@ -306,7 +306,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> n.equals(rejectedNode) == false, seedNodes(seedNode))) {
null, Settings.EMPTY, 3, n -> n.equals(rejectedNode) == false, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -341,7 +341,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> n.equals(seedNode) == false, seedNodes(seedNode))) {
null, Settings.EMPTY, 3, n -> n.equals(seedNode) == false, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
final IllegalStateException ise = expectThrows(IllegalStateException.class, connectFuture::actionGet);
@ -378,7 +378,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(seedNode, otherSeedNode))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(seedNode, otherSeedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();
@ -426,7 +426,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(seedNode))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(seedNode))) {
assertFalse(connectionManager.nodeConnected(seedNode));
assertFalse(connectionManager.nodeConnected(discoverableNode));
assertTrue(strategy.assertNoRunningConnections());
@ -496,7 +496,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
proxyAddress.toString(), 3, n -> true, seedNodes)) {
proxyAddress.toString(), Settings.EMPTY, 3, n -> true, seedNodes)) {
assertFalse(connectionManager.nodeConnected(unaddressableSeedNode));
assertFalse(connectionManager.nodeConnected(discoverableNode));
assertTrue(strategy.assertNoRunningConnections());
@ -533,7 +533,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
SniffConnectionStrategy strategy = new SniffConnectionStrategy(clusterAlias, localService, remoteConnectionManager,
null, 3, n -> true, seedNodes(seedNode))) {
null, Settings.EMPTY, 3, n -> true, seedNodes(seedNode))) {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
strategy.connect(connectFuture);
connectFuture.actionGet();

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import java.util.Arrays;
import java.util.List;
@ -50,6 +51,7 @@ public final class InternalSettingsPlugin extends Plugin {
INDEX_CREATION_DATE_SETTING,
PROVIDED_NAME_SETTING,
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
RemoteConnectionStrategy.REMOTE_MAX_PENDING_CONNECTION_LISTENERS,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING,

View File

@ -76,6 +76,7 @@ import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
import org.elasticsearch.xpack.ccr.CcrSettings;
@ -133,7 +134,11 @@ public abstract class CcrIntegTestCase extends ESTestCase {
}
protected Settings followerClusterSettings() {
return Settings.EMPTY;
final Settings.Builder builder = Settings.builder();
if (randomBoolean()) {
builder.put(RemoteConnectionStrategy.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), randomIntBetween(1, 100));
}
return builder.build();
}
@Before

View File

@ -106,6 +106,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
@Override
protected Settings followerClusterSettings() {
return Settings.builder()
.put(super.followerClusterSettings())
.put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
.build();
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequ
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.RemoteConnectionStrategy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@ -34,16 +35,28 @@ public class RestartIndexFollowingIT extends CcrIntegTestCase {
return false;
}
@Override
protected Settings followerClusterSettings() {
final Settings.Builder settings = Settings.builder().put(super.followerClusterSettings());
if (randomBoolean()) {
settings.put(RemoteConnectionStrategy.REMOTE_MAX_PENDING_CONNECTION_LISTENERS.getKey(), 1);
}
return settings.build();
}
public void testFollowIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0);
final String leaderIndexSettings = getIndexSettings(randomIntBetween(1, 10), 0);
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1");
setupRemoteCluster();
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
if (randomBoolean()) {
followRequest.getParameters().setMaxReadRequestOperationCount(randomIntBetween(5, 10));
}
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
final long firstBatchNumDocs = randomIntBetween(10, 200);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
@ -57,23 +70,18 @@ public class RestartIndexFollowingIT extends CcrIntegTestCase {
getFollowerCluster().fullRestart();
ensureFollowerGreen("index2");
final long secondBatchNumDocs = randomIntBetween(2, 64);
final long secondBatchNumDocs = randomIntBetween(10, 200);
for (int i = 0; i < secondBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}
assertBusy(() -> {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs));
});
cleanRemoteCluster();
getLeaderCluster().fullRestart();
ensureLeaderGreen("index1");
// Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted:
setupRemoteCluster();
final long thirdBatchNumDocs = randomIntBetween(2, 64);
final long thirdBatchNumDocs = randomIntBetween(10, 200);
for (int i = 0; i < thirdBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}