[CCR] Change AutofollowCoordinator to use wait_for_metadata_version (#36264)

Changed AutofollowCoordinator makes use of the wait_for_metadata_version
feature in cluster state API and removed hard coded poll interval.

Originates from #35895
Relates to #33007
This commit is contained in:
Martijn van Groningen 2018-12-12 12:47:24 +01:00 committed by GitHub
parent c6de68c3b9
commit 883940ad92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 55 deletions

View File

@ -156,7 +156,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
return Arrays.asList( return Arrays.asList(
ccrLicenseChecker, ccrLicenseChecker,
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker) new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
); );
} }

View File

@ -121,8 +121,9 @@ public final class CcrLicenseChecker {
client.getRemoteClusterClient(clusterAlias), client.getRemoteClusterClient(clusterAlias),
request, request,
onFailure, onFailure,
leaderClusterState -> { remoteClusterStateResponse -> {
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); ClusterState remoteClusterState = remoteClusterStateResponse.getState();
IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex);
if (leaderIndexMetaData == null) { if (leaderIndexMetaData == null) {
onFailure.accept(new IndexNotFoundException(leaderIndex)); onFailure.accept(new IndexNotFoundException(leaderIndex));
return; return;
@ -159,7 +160,7 @@ public final class CcrLicenseChecker {
final String clusterAlias, final String clusterAlias,
final ClusterStateRequest request, final ClusterStateRequest request,
final Consumer<Exception> onFailure, final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) { final Consumer<ClusterStateResponse> leaderClusterStateConsumer) {
try { try {
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias)); Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
checkRemoteClusterLicenseAndFetchClusterState( checkRemoteClusterLicenseAndFetchClusterState(
@ -199,7 +200,7 @@ public final class CcrLicenseChecker {
final Client remoteClient, final Client remoteClient,
final ClusterStateRequest request, final ClusterStateRequest request,
final Consumer<Exception> onFailure, final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer, final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense, final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense) { final Function<Exception, ElasticsearchStatusException> unknownLicense) {
// we have to check the license on the remote cluster // we have to check the license on the remote cluster
@ -211,7 +212,7 @@ public final class CcrLicenseChecker {
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) { if (licenseCheck.isSuccess()) {
final ActionListener<ClusterStateResponse> clusterStateListener = final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata // following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(request, clusterStateListener); remoteClient.admin().cluster().state(request, clusterStateListener);
} else { } else {

View File

@ -14,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -26,13 +27,11 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
@ -66,7 +65,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
private final Client client; private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker; private final CcrLicenseChecker ccrLicenseChecker;
@ -80,11 +78,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {
public AutoFollowCoordinator( public AutoFollowCoordinator(
Client client, Client client,
ThreadPool threadPool,
ClusterService clusterService, ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) { CcrLicenseChecker ccrLicenseChecker) {
this.client = client; this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
clusterService.addListener(this); clusterService.addListener(this);
@ -150,22 +146,24 @@ public class AutoFollowCoordinator implements ClusterStateListener {
Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size()); Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) { for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
@Override @Override
void getRemoteClusterState(final String remoteCluster, void getRemoteClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) { final long metadataVersion,
final BiConsumer<ClusterStateResponse, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest(); final ClusterStateRequest request = new ClusterStateRequest();
request.clear(); request.clear();
request.metaData(true); request.metaData(true);
request.routingTable(true); request.routingTable(true);
request.waitForMetaDataVersion(metadataVersion);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client, client,
remoteCluster, remoteCluster,
request, request,
e -> handler.accept(null, e), e -> handler.accept(null, e),
remoteClusterState -> handler.accept(remoteClusterState, null)); remoteClusterStateResponse -> handler.accept(remoteClusterStateResponse, null));
} }
@Override @Override
@ -239,19 +237,17 @@ public class AutoFollowCoordinator implements ClusterStateListener {
abstract static class AutoFollower { abstract static class AutoFollower {
private final String remoteCluster; private final String remoteCluster;
private final ThreadPool threadPool;
private final Consumer<List<AutoFollowResult>> statsUpdater; private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier; private final Supplier<ClusterState> followerClusterStateSupplier;
private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown; private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults; private volatile AtomicArray<AutoFollowResult> autoFollowResults;
AutoFollower(final String remoteCluster, AutoFollower(final String remoteCluster,
final ThreadPool threadPool,
final Consumer<List<AutoFollowResult>> statsUpdater, final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) { final Supplier<ClusterState> followerClusterStateSupplier) {
this.remoteCluster = remoteCluster; this.remoteCluster = remoteCluster;
this.threadPool = threadPool;
this.statsUpdater = statsUpdater; this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier; this.followerClusterStateSupplier = followerClusterStateSupplier;
} }
@ -276,9 +272,15 @@ public class AutoFollowCoordinator implements ClusterStateListener {
this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size());
getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> { getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
if (remoteClusterState != null) { if (remoteClusterStateResponse != null) {
assert remoteError == null; assert remoteError == null;
if (remoteClusterStateResponse.isWaitForTimedOut()) {
start();
return;
}
ClusterState remoteClusterState = remoteClusterStateResponse.getState();
metadataVersion = remoteClusterState.metaData().version();
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns); autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
} else { } else {
assert remoteError != null; assert remoteError != null;
@ -402,8 +404,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
autoFollowResults.set(slot, result); autoFollowResults.set(slot, result);
if (autoFollowPatternsCountDown.countDown()) { if (autoFollowPatternsCountDown.countDown()) {
statsUpdater.accept(autoFollowResults.asList()); statsUpdater.accept(autoFollowResults.asList());
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: start();
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start);
} }
} }
@ -525,13 +526,15 @@ public class AutoFollowCoordinator implements ClusterStateListener {
} }
/** /**
* Fetch the cluster state from the leader with the specified cluster alias * Fetch a remote cluster state from with the specified cluster alias
* @param remoteCluster the name of the leader cluster * @param remoteCluster the name of the leader cluster
* @param metadataVersion the last seen metadata version
* @param handler the callback to invoke * @param handler the callback to invoke
*/ */
abstract void getRemoteClusterState( abstract void getRemoteClusterState(
String remoteCluster, String remoteCluster,
BiConsumer<ClusterState, Exception> handler long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler
); );
abstract void createAndFollow( abstract void createAndFollow(

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -80,7 +81,7 @@ public class TransportPutAutoFollowPatternAction extends
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Consumer<ClusterState> consumer = remoteClusterState -> { Consumer<ClusterStateResponse> consumer = remoteClusterState -> {
String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
if (e == null) { if (e == null) {
@ -94,7 +95,7 @@ public class TransportPutAutoFollowPatternAction extends
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
return innerPut(request, filteredHeaders, currentState, remoteClusterState); return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState());
} }
}); });
} else { } else {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ccr.action; package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -22,7 +23,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -46,13 +47,12 @@ import java.util.function.Supplier;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -60,7 +60,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testAutoFollower() { public void testAutoFollower() {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState remoteState = createRemoteClusterState("logs-20190101"); ClusterState remoteState = createRemoteClusterState("logs-20190101");
@ -90,12 +89,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), nullValue()); assertThat(entries.get(0).getValue(), nullValue());
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(currentState)) { AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState)) {
@Override @Override
void getRemoteClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
assertThat(remoteCluster, equalTo("remote")); assertThat(remoteCluster, equalTo("remote"));
handler.accept(remoteState, null); handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null);
} }
@Override @Override
@ -131,7 +131,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testAutoFollowerClusterStateApiFailure() { public void testAutoFollowerClusterStateApiFailure() {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
@ -155,10 +154,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).clusterStateFetchException, sameInstance(failure));
assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0));
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) {
@Override @Override
void getRemoteClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
handler.accept(null, failure); handler.accept(null, failure);
} }
@ -182,7 +182,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testAutoFollowerUpdateClusterStateFailure() { public void testAutoFollowerUpdateClusterStateFailure() {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState remoteState = createRemoteClusterState("logs-20190101"); ClusterState remoteState = createRemoteClusterState("logs-20190101");
@ -210,11 +209,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), sameInstance(failure)); assertThat(entries.get(0).getValue(), sameInstance(failure));
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) {
@Override @Override
void getRemoteClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { long metadataVersion,
handler.accept(remoteState, null); BiConsumer<ClusterStateResponse, Exception> handler) {
handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null);
} }
@Override @Override
@ -239,7 +239,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testAutoFollowerCreateAndFollowApiCallFailure() { public void testAutoFollowerCreateAndFollowApiCallFailure() {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState remoteState = createRemoteClusterState("logs-20190101"); ClusterState remoteState = createRemoteClusterState("logs-20190101");
@ -267,11 +266,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), sameInstance(failure)); assertThat(entries.get(0).getValue(), sameInstance(failure));
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) {
@Override @Override
void getRemoteClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { long metadataVersion,
handler.accept(remoteState, null); BiConsumer<ClusterStateResponse, Exception> handler) {
handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null);
} }
@Override @Override
@ -530,7 +530,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
public void testStats() { public void testStats() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
null,
null, null,
mock(ClusterService.class), mock(ClusterService.class),
new CcrLicenseChecker(() -> true, () -> false) new CcrLicenseChecker(() -> true, () -> false)
@ -586,6 +585,122 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error"));
} }
public void testWaitForMetadataVersion() {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
final LinkedList<ClusterState> leaderStates = new LinkedList<>();
ClusterState[] states = new ClusterState[16];
for (int i = 0; i < states.length; i++) {
states[i] = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();
String indexName = "logs-" + i;
leaderStates.add(i == 0 ? createRemoteClusterState(indexName) : createRemoteClusterState(leaderStates.get(i - 1), indexName));
}
List<AutoFollowCoordinator.AutoFollowResult> allResults = new ArrayList<>();
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = allResults::addAll;
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) {
long previousRequestedMetadataVersion = 0;
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
assertThat(remoteCluster, equalTo("remote"));
assertThat(metadataVersion, greaterThan(previousRequestedMetadataVersion));
handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderStates.poll(), 1L, false), null);
}
@Override
void createAndFollow(Map<String, String> headers,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
successHandler.run();
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
handler.accept(null);
}
};
autoFollower.start();
assertThat(allResults.size(), equalTo(states.length));
for (int i = 0; i < states.length; i++) {
assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true));
}
}
public void testWaitForTimeOut() {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null, null, null, null);
Map<String, AutoFollowPattern> patterns = new HashMap<>();
patterns.put("remote", autoFollowPattern);
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
ClusterState[] states = new ClusterState[16];
for (int i = 0; i < states.length; i++) {
states[i] = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();
}
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = results -> {
fail("should not be invoked");
};
AtomicInteger counter = new AtomicInteger();
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) {
long previousRequestedMetadataVersion = 0;
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
counter.incrementAndGet();
assertThat(remoteCluster, equalTo("remote"));
assertThat(metadataVersion, greaterThan(previousRequestedMetadataVersion));
handler.accept(new ClusterStateResponse(new ClusterName("name"), null, 1L, true), null);
}
@Override
void createAndFollow(Map<String, String> headers,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
fail("should not be invoked");
}
@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
Consumer<Exception> handler) {
fail("should not be invoked");
}
};
autoFollower.start();
assertThat(counter.get(), equalTo(states.length));
}
private static ClusterState createRemoteClusterState(String indexName) { private static ClusterState createRemoteClusterState(String indexName) {
IndexMetaData indexMetaData = IndexMetaData.builder(indexName) IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
@ -603,6 +718,25 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
return csBuilder.build(); return csBuilder.build();
} }
private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) {
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.build();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder(previous.metaData())
.version(previous.metaData().version() + 1)
.put(indexMetaData, true));
ShardRouting shardRouting =
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build();
csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
return csBuilder.build();
}
private static Supplier<ClusterState> localClusterStateSupplier(ClusterState... states) { private static Supplier<ClusterState> localClusterStateSupplier(ClusterState... states) {
final AutoFollowMetadata emptyAutoFollowMetadata = final AutoFollowMetadata emptyAutoFollowMetadata =
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
@ -620,15 +754,4 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
}; };
} }
private static ThreadPool mockThreadPool() {
ThreadPool threadPool = mock(ThreadPool.class);
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Runnable task = (Runnable) args[2];
task.run();
return null;
}).when(threadPool).schedule(any(), anyString(), any());
return threadPool;
}
} }