diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/AutoFollowStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/AutoFollowStats.java index 09b57e68ff5..b442336ca4d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/AutoFollowStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/AutoFollowStats.java @@ -39,6 +39,10 @@ public final class AutoFollowStats { static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); static final ParseField LEADER_INDEX = new ParseField("leader_index"); static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); + static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters"); + static final ParseField CLUSTER_NAME = new ParseField("cluster_name"); + static final ParseField TIME_SINCE_LAST_CHECK_MILLIS = new ParseField("time_since_last_check_millis"); + static final ParseField LAST_SEEN_METADATA_VERSION = new ParseField("last_seen_metadata_version"); @SuppressWarnings("unchecked") static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", @@ -48,6 +52,10 @@ public final class AutoFollowStats { (Long) args[2], new TreeMap<>( ((List>) args[3]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), + new TreeMap<>( + ((List>) args[4]) .stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) )); @@ -57,6 +65,11 @@ public final class AutoFollowStats { "auto_follow_stats_errors", args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1])); + private static final ConstructingObjectParser, Void> AUTO_FOLLOWED_CLUSTERS_PARSER = + new ConstructingObjectParser<>( + "auto_followed_clusters", + args -> new AbstractMap.SimpleEntry<>((String) args[0], new AutoFollowedCluster((Long) args[1], (Long) args[2]))); + static { AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject( @@ -64,26 +77,35 @@ public final class AutoFollowStats { (p, c) -> ElasticsearchException.fromXContent(p), AUTO_FOLLOW_EXCEPTION); + AUTO_FOLLOWED_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME); + AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_CHECK_MILLIS); + AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_SEEN_METADATA_VERSION); + STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, RECENT_AUTO_FOLLOW_ERRORS); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOWED_CLUSTERS_PARSER, + AUTO_FOLLOWED_CLUSTERS); } private final long numberOfFailedFollowIndices; private final long numberOfFailedRemoteClusterStateRequests; private final long numberOfSuccessfulFollowIndices; private final NavigableMap recentAutoFollowErrors; + private final NavigableMap autoFollowedClusters; AutoFollowStats(long numberOfFailedFollowIndices, long numberOfFailedRemoteClusterStateRequests, long numberOfSuccessfulFollowIndices, - NavigableMap recentAutoFollowErrors) { + NavigableMap recentAutoFollowErrors, + NavigableMap autoFollowedClusters) { this.numberOfFailedFollowIndices = numberOfFailedFollowIndices; this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices; this.recentAutoFollowErrors = recentAutoFollowErrors; + this.autoFollowedClusters = autoFollowedClusters; } public long getNumberOfFailedFollowIndices() { @@ -102,4 +124,27 @@ public final class AutoFollowStats { return recentAutoFollowErrors; } + public NavigableMap getAutoFollowedClusters() { + return autoFollowedClusters; + } + + public static class AutoFollowedCluster { + + private final long timeSinceLastCheckMillis; + private final long lastSeenMetadataVersion; + + public AutoFollowedCluster(long timeSinceLastCheckMillis, long lastSeenMetadataVersion) { + this.timeSinceLastCheckMillis = timeSinceLastCheckMillis; + this.lastSeenMetadataVersion = lastSeenMetadataVersion; + } + + public long getTimeSinceLastCheckMillis() { + return timeSinceLastCheckMillis; + } + + public long getLastSeenMetadataVersion() { + return lastSeenMetadataVersion; + } + } + } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java index 039e31151c4..8d53b5cde08 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.ccr; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.client.ccr.AutoFollowStats.AutoFollowedCluster; import org.elasticsearch.client.ccr.IndicesFollowStats.ShardFollowStats; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -185,6 +186,19 @@ public class CcrStatsResponseTests extends ESTestCase { builder.endObject(); } builder.endArray(); + builder.startArray(AutoFollowStats.AUTO_FOLLOWED_CLUSTERS.getPreferredName()); + for (Map.Entry entry : autoFollowStats.getAutoFollowedClusters().entrySet()) { + builder.startObject(); + { + builder.field(AutoFollowStats.CLUSTER_NAME.getPreferredName(), entry.getKey()); + builder.field(AutoFollowStats.TIME_SINCE_LAST_CHECK_MILLIS.getPreferredName(), + entry.getValue().getTimeSinceLastCheckMillis()); + builder.field(AutoFollowStats.LAST_SEEN_METADATA_VERSION.getPreferredName(), + entry.getValue().getLastSeenMetadataVersion()); + } + builder.endObject(); + } + builder.endArray(); } builder.endObject(); @@ -315,11 +329,16 @@ public class CcrStatsResponseTests extends ESTestCase { for (int i = 0; i < count; i++) { readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]"))); } + final NavigableMap autoFollowClusters = new TreeMap<>(); + for (int i = 0; i < count; i++) { + autoFollowClusters.put("" + i, new AutoFollowedCluster(randomLong(), randomNonNegativeLong())); + } return new AutoFollowStats( randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - readExceptions + readExceptions, + autoFollowClusters ); } diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index b8491e8a601..d849a99c459 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -105,7 +105,8 @@ The API returns the following results: "number_of_failed_follow_indices" : 0, "number_of_failed_remote_cluster_state_requests" : 0, "number_of_successful_follow_indices" : 1, - "recent_auto_follow_errors" : [] + "recent_auto_follow_errors" : [], + "auto_followed_clusters" : [] }, "follow_stats" : { "indices" : [ @@ -151,6 +152,7 @@ The API returns the following results: // TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/] // TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/] // TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/] +// TESTRESPONSE[s/"auto_followed_clusters" : \[\]/"auto_followed_clusters" : $body.auto_follow_stats.auto_followed_clusters/] // TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/] // TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/] // TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/] diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b25bd71c67f..70d4905d943 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -156,7 +156,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 7900351105c..4888b0367fd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -52,9 +52,12 @@ import java.util.TreeMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedCluster; + /** * A component that runs only on the elected master node and follows leader indices automatically * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. @@ -67,6 +70,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final Client client; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; + private final LongSupplier relativeMillisTimeProvider; private volatile Map autoFollowers = Collections.emptyMap(); @@ -79,10 +83,13 @@ public class AutoFollowCoordinator implements ClusterStateListener { public AutoFollowCoordinator( Client client, ClusterService clusterService, - CcrLicenseChecker ccrLicenseChecker) { + CcrLicenseChecker ccrLicenseChecker, + LongSupplier relativeMillisTimeProvider) { + this.client = client; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); + this.relativeMillisTimeProvider = relativeMillisTimeProvider; clusterService.addListener(this); this.recentAutoFollowErrors = new LinkedHashMap() { @Override @@ -93,11 +100,26 @@ public class AutoFollowCoordinator implements ClusterStateListener { } public synchronized AutoFollowStats getStats() { + final Map autoFollowers = this.autoFollowers; + final TreeMap timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>(); + for (Map.Entry entry : autoFollowers.entrySet()) { + long lastAutoFollowTimeInMillis = entry.getValue().lastAutoFollowTimeInMillis; + long lastSeenMetadataVersion = entry.getValue().metadataVersion; + if (lastAutoFollowTimeInMillis != -1) { + long timeSinceLastCheckInMillis = relativeMillisTimeProvider.getAsLong() - lastAutoFollowTimeInMillis; + timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), + new AutoFollowedCluster(timeSinceLastCheckInMillis, lastSeenMetadataVersion)); + } else { + timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), new AutoFollowedCluster(-1L, lastSeenMetadataVersion)); + } + } + return new AutoFollowStats( numberOfFailedIndicesAutoFollowed, numberOfFailedRemoteClusterStateRequests, numberOfSuccessfulIndicesAutoFollowed, - new TreeMap<>(recentAutoFollowErrors) + new TreeMap<>(recentAutoFollowErrors), + timesSinceLastAutoFollowPerRemoteCluster ); } @@ -146,7 +168,8 @@ public class AutoFollowCoordinator implements ClusterStateListener { Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); for (String remoteCluster : newRemoteClusters) { - AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) { + AutoFollower autoFollower = + new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider) { @Override void getRemoteClusterState(final String remoteCluster, @@ -239,20 +262,25 @@ public class AutoFollowCoordinator implements ClusterStateListener { private final String remoteCluster; private final Consumer> statsUpdater; private final Supplier followerClusterStateSupplier; + private final LongSupplier relativeTimeProvider; + private volatile long lastAutoFollowTimeInMillis = -1; private volatile long metadataVersion = 0; private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; AutoFollower(final String remoteCluster, final Consumer> statsUpdater, - final Supplier followerClusterStateSupplier) { + final Supplier followerClusterStateSupplier, + LongSupplier relativeTimeProvider) { this.remoteCluster = remoteCluster; this.statsUpdater = statsUpdater; this.followerClusterStateSupplier = followerClusterStateSupplier; + this.relativeTimeProvider = relativeTimeProvider; } void start() { + lastAutoFollowTimeInMillis = relativeTimeProvider.getAsLong(); final ClusterState clusterState = followerClusterStateSupplier.get(); final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 534397a0a9a..7228acaacf1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -89,7 +89,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) { @Override void getRemoteClusterState(String remoteCluster, long metadataVersion, @@ -154,7 +154,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) { @Override void getRemoteClusterState(String remoteCluster, long metadataVersion, @@ -209,7 +209,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) { @Override void getRemoteClusterState(String remoteCluster, long metadataVersion, @@ -266,7 +266,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState), () -> 1L) { @Override void getRemoteClusterState(String remoteCluster, long metadataVersion, @@ -532,8 +532,8 @@ public class AutoFollowCoordinatorTests extends ESTestCase { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( null, mock(ClusterService.class), - new CcrLicenseChecker(() -> true, () -> false) - ); + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); autoFollowCoordinator.updateStats(Collections.singletonList( new AutoFollowCoordinator.AutoFollowResult("_alias1")) @@ -585,6 +585,92 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); } + public void testUpdateAutoFollowers() { + ClusterService clusterService = mock(ClusterService.class); + // Return a cluster state with no patterns so that the auto followers never really execute: + ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build(); + when(clusterService.state()).thenReturn(followerState); + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + clusterService, + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + // Add 3 patterns: + Map patterns = new HashMap<>(); + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null, + null, null, null, null, null, null, null, null)); + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + // Remove patterns 1 and 3: + patterns.remove("pattern1"); + patterns.remove("pattern3"); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + // Add pattern 4: + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null, + null, null, null, null, null, null, null, null)); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + // Remove patterns 2 and 4: + patterns.remove("pattern2"); + patterns.remove("pattern4"); + clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + } + + public void testUpdateAutoFollowersNoPatterns() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + } + + public void testUpdateAutoFollowersNoAutoFollowMetadata() { + AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + null, + mock(ClusterService.class), + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L); + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build(); + autoFollowCoordinator.updateAutoFollowers(clusterState); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + } + public void testWaitForMetadataVersion() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); @@ -611,7 +697,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { List allResults = new ArrayList<>(); Consumer> handler = allResults::addAll; - AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L) { long previousRequestedMetadataVersion = 0; @@ -669,7 +755,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { fail("should not be invoked"); }; AtomicInteger counter = new AtomicInteger(); - AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states), () -> 1L) { long previousRequestedMetadataVersion = 0; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java index c651cca5b6a..41e771ac97e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions; +import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters; import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse; public class AutoFollowStatsResponseTests extends AbstractWireSerializingTestCase { @@ -27,7 +28,8 @@ public class AutoFollowStatsResponseTests extends AbstractWireSerializingTestCas randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), - randomReadExceptions() + randomReadExceptions(), + randomTrackingClusters() ); FollowStatsAction.StatsResponses statsResponse = createStatsResponse(); return new CcrStatsAction.Response(autoFollowStats, statsResponse); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java index c4a61529f49..61b92b485c1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedCluster; import java.io.IOException; import java.util.Map; @@ -34,7 +35,8 @@ public class AutoFollowStatsTests extends AbstractSerializingTestCase randomTrackingClusters() { + final int count = randomIntBetween(0, 16); + final NavigableMap readExceptions = new TreeMap<>(); + for (int i = 0; i < count; i++) { + readExceptions.put("" + i, new AutoFollowedCluster(randomLong(), randomNonNegativeLong())); + } + return readExceptions; + } + @Override protected Writeable.Reader instanceReader() { return AutoFollowStats::new; @@ -56,6 +67,11 @@ public class AutoFollowStatsTests extends AbstractSerializingTestCase entry : newInstance.getRecentAutoFollowErrors().entrySet()) { @@ -68,6 +84,8 @@ public class AutoFollowStatsTests extends AbstractSerializingTestCase(Collections.singletonMap( randomAlphaOfLength(4), new ElasticsearchException("cannot follow index"))); + + final NavigableMap trackingClusters = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + new AutoFollowedCluster(1L, 1L))); final AutoFollowStats autoFollowStats = - new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions); + new AutoFollowStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), recentAutoFollowExceptions, + trackingClusters); final AutoFollowStatsMonitoringDoc document = new AutoFollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, autoFollowStats); @@ -99,7 +107,7 @@ public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase + "\"ccr_auto_follow_stats\":{" + "\"number_of_failed_follow_indices\":" + autoFollowStats.getNumberOfFailedFollowIndices() + "," + "\"number_of_failed_remote_cluster_state_requests\":" + - autoFollowStats.getNumberOfFailedRemoteClusterStateRequests() + "," + autoFollowStats.getNumberOfFailedRemoteClusterStateRequests() + "," + "\"number_of_successful_follow_indices\":" + autoFollowStats.getNumberOfSuccessfulFollowIndices() + "," + "\"recent_auto_follow_errors\":[" + "{" @@ -109,6 +117,15 @@ public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase + "\"reason\":\"cannot follow index\"" + "}" + "}" + + "]," + + "\"auto_followed_clusters\":[" + + "{" + + "\"cluster_name\":\"" + trackingClusters.keySet().iterator().next() + "\"," + + "\"time_since_last_check_millis\":" + + trackingClusters.values().iterator().next().getTimeSinceLastCheckMillis() + "," + + "\"last_seen_metadata_version\":" + + trackingClusters.values().iterator().next().getLastSeenMetadataVersion() + + "}" + "]" + "}" + "}")); @@ -117,7 +134,11 @@ public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException { final NavigableMap fetchExceptions = new TreeMap<>(Collections.singletonMap("leader_index", new ElasticsearchException("cannot follow index"))); - final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions); + final NavigableMap trackingClusters = + new TreeMap<>(Collections.singletonMap( + randomAlphaOfLength(4), + new AutoFollowedCluster(1L, 1L))); + final AutoFollowStats status = new AutoFollowStats(1, 0, 2, fetchExceptions, trackingClusters); XContentBuilder builder = jsonBuilder(); builder.value(status); Map serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false); @@ -142,18 +163,28 @@ public class AutoFollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase assertThat("expected keyword field type for field [" + fieldName + "]", fieldType, anyOf(equalTo("keyword"), equalTo("text"))); } else { + Map innerFieldValue = (Map) ((List) fieldValue).get(0); // Manual test specific object fields and if not just fail: if (fieldName.equals("recent_auto_follow_errors")) { assertThat(fieldType, equalTo("nested")); - assertThat(((Map) fieldMapping.get("properties")).size(), equalTo(2)); + assertThat(((Map) fieldMapping.get("properties")).size(), equalTo(innerFieldValue.size())); assertThat(XContentMapValues.extractValue("properties.leader_index.type", fieldMapping), equalTo("keyword")); assertThat(XContentMapValues.extractValue("properties.auto_follow_exception.type", fieldMapping), equalTo("object")); + innerFieldValue = (Map) innerFieldValue.get("auto_follow_exception"); Map exceptionFieldMapping = (Map) XContentMapValues.extractValue("properties.auto_follow_exception.properties", fieldMapping); - assertThat(exceptionFieldMapping.size(), equalTo(2)); + assertThat(exceptionFieldMapping.size(), equalTo(innerFieldValue.size())); assertThat(XContentMapValues.extractValue("type.type", exceptionFieldMapping), equalTo("keyword")); assertThat(XContentMapValues.extractValue("reason.type", exceptionFieldMapping), equalTo("text")); + } else if (fieldName.equals("auto_followed_clusters")) { + assertThat(fieldType, equalTo("nested")); + Map innerFieldMapping = ((Map) fieldMapping.get("properties")); + assertThat(innerFieldMapping.size(), equalTo(innerFieldValue.size())); + + assertThat(XContentMapValues.extractValue("cluster_name.type", innerFieldMapping), equalTo("keyword")); + assertThat(XContentMapValues.extractValue("time_since_last_check_millis.type", innerFieldMapping), equalTo("long")); + assertThat(XContentMapValues.extractValue("last_seen_metadata_version.type", innerFieldMapping), equalTo("long")); } else { fail("unexpected field value type [" + fieldValue.getClass() + "] for field [" + fieldName + "]"); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java index 6f28c450f04..032cedbdcdf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowStats.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ccr; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -17,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.AbstractMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -33,6 +35,10 @@ public class AutoFollowStats implements Writeable, ToXContentObject { private static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors"); private static final ParseField LEADER_INDEX = new ParseField("leader_index"); private static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception"); + private static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters"); + private static final ParseField CLUSTER_NAME = new ParseField("cluster_name"); + private static final ParseField TIME_SINCE_LAST_CHECK_MILLIS = new ParseField("time_since_last_check_millis"); + private static final ParseField LAST_SEEN_METADATA_VERSION = new ParseField("last_seen_metadata_version"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats", @@ -43,26 +49,39 @@ public class AutoFollowStats implements Writeable, ToXContentObject { new TreeMap<>( ((List>) args[3]) .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) - )); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))), + new TreeMap<>( + ((List>) args[4]) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); private static final ConstructingObjectParser, Void> AUTO_FOLLOW_EXCEPTIONS_PARSER = new ConstructingObjectParser<>( "auto_follow_stats_errors", args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1])); + private static final ConstructingObjectParser, Void> AUTO_FOLLOWED_CLUSTERS_PARSER = + new ConstructingObjectParser<>( + "auto_followed_clusters", + args -> new AbstractMap.SimpleEntry<>((String) args[0], new AutoFollowedCluster((Long) args[1], (Long) args[2]))); + static { AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX); AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject( ConstructingObjectParser.constructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), AUTO_FOLLOW_EXCEPTION); + AUTO_FOLLOWED_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME); + AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_CHECK_MILLIS); + AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_SEEN_METADATA_VERSION); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS); STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED); STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER, RECENT_AUTO_FOLLOW_ERRORS); + STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOWED_CLUSTERS_PARSER, + AUTO_FOLLOWED_CLUSTERS); } public static AutoFollowStats fromXContent(final XContentParser parser) { @@ -73,24 +92,32 @@ public class AutoFollowStats implements Writeable, ToXContentObject { private final long numberOfFailedRemoteClusterStateRequests; private final long numberOfSuccessfulFollowIndices; private final NavigableMap recentAutoFollowErrors; + private final NavigableMap autoFollowedClusters; public AutoFollowStats( - long numberOfFailedFollowIndices, - long numberOfFailedRemoteClusterStateRequests, - long numberOfSuccessfulFollowIndices, - NavigableMap recentAutoFollowErrors + long numberOfFailedFollowIndices, + long numberOfFailedRemoteClusterStateRequests, + long numberOfSuccessfulFollowIndices, + NavigableMap recentAutoFollowErrors, + NavigableMap autoFollowedClusters ) { this.numberOfFailedFollowIndices = numberOfFailedFollowIndices; this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests; this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices; this.recentAutoFollowErrors = recentAutoFollowErrors; + this.autoFollowedClusters = autoFollowedClusters; } public AutoFollowStats(StreamInput in) throws IOException { numberOfFailedFollowIndices = in.readVLong(); numberOfFailedRemoteClusterStateRequests = in.readVLong(); numberOfSuccessfulFollowIndices = in.readVLong(); - recentAutoFollowErrors= new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + recentAutoFollowErrors = new TreeMap<>(in.readMap(StreamInput::readString, StreamInput::readException)); + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + autoFollowedClusters = new TreeMap<>(in.readMap(StreamInput::readString, AutoFollowedCluster::new)); + } else { + autoFollowedClusters = Collections.emptyNavigableMap(); + } } @Override @@ -99,6 +126,9 @@ public class AutoFollowStats implements Writeable, ToXContentObject { out.writeVLong(numberOfFailedRemoteClusterStateRequests); out.writeVLong(numberOfSuccessfulFollowIndices); out.writeMap(recentAutoFollowErrors, StreamOutput::writeString, StreamOutput::writeException); + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeMap(autoFollowedClusters, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); + } } public long getNumberOfFailedFollowIndices() { @@ -117,6 +147,10 @@ public class AutoFollowStats implements Writeable, ToXContentObject { return recentAutoFollowErrors; } + public NavigableMap getAutoFollowedClusters() { + return autoFollowedClusters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -148,6 +182,19 @@ public class AutoFollowStats implements Writeable, ToXContentObject { } } builder.endArray(); + builder.startArray(AUTO_FOLLOWED_CLUSTERS.getPreferredName()); + { + for (final Map.Entry entry : autoFollowedClusters.entrySet()) { + builder.startObject(); + { + builder.field(CLUSTER_NAME.getPreferredName(), entry.getKey()); + builder.field(TIME_SINCE_LAST_CHECK_MILLIS.getPreferredName(), entry.getValue().getTimeSinceLastCheckMillis()); + builder.field(LAST_SEEN_METADATA_VERSION.getPreferredName(), entry.getValue().getLastSeenMetadataVersion()); + } + builder.endObject(); + } + } + builder.endArray(); return builder; } @@ -165,7 +212,8 @@ public class AutoFollowStats implements Writeable, ToXContentObject { * keys. */ recentAutoFollowErrors.keySet().equals(that.recentAutoFollowErrors.keySet()) && - getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)); + getFetchExceptionMessages(this).equals(getFetchExceptionMessages(that)) && + Objects.equals(autoFollowedClusters, that.autoFollowedClusters); } @Override @@ -179,7 +227,8 @@ public class AutoFollowStats implements Writeable, ToXContentObject { * messages. Note that we are relying on the fact that the auto follow exceptions are ordered by keys. */ recentAutoFollowErrors.keySet(), - getFetchExceptionMessages(this) + getFetchExceptionMessages(this), + autoFollowedClusters ); } @@ -194,6 +243,58 @@ public class AutoFollowStats implements Writeable, ToXContentObject { ", numberOfFailedRemoteClusterStateRequests=" + numberOfFailedRemoteClusterStateRequests + ", numberOfSuccessfulFollowIndices=" + numberOfSuccessfulFollowIndices + ", recentAutoFollowErrors=" + recentAutoFollowErrors + + ", autoFollowedClusters=" + autoFollowedClusters + '}'; } + + public static class AutoFollowedCluster implements Writeable { + + private final long timeSinceLastCheckMillis; + private final long lastSeenMetadataVersion; + + public AutoFollowedCluster(long timeSinceLastCheckMillis, long lastSeenMetadataVersion) { + this.timeSinceLastCheckMillis = timeSinceLastCheckMillis; + this.lastSeenMetadataVersion = lastSeenMetadataVersion; + } + + public AutoFollowedCluster(StreamInput in) throws IOException { + this(in.readZLong(), in.readVLong()); + } + + public long getTimeSinceLastCheckMillis() { + return timeSinceLastCheckMillis; + } + + public long getLastSeenMetadataVersion() { + return lastSeenMetadataVersion; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeZLong(timeSinceLastCheckMillis); + out.writeVLong(lastSeenMetadataVersion); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AutoFollowedCluster that = (AutoFollowedCluster) o; + return timeSinceLastCheckMillis == that.timeSinceLastCheckMillis && + lastSeenMetadataVersion == that.lastSeenMetadataVersion; + } + + @Override + public int hashCode() { + return Objects.hash(timeSinceLastCheckMillis, lastSeenMetadataVersion); + } + + @Override + public String toString() { + return "AutoFollowedCluster{" + + "timeSinceLastCheckMillis=" + timeSinceLastCheckMillis + + ", lastSeenMetadataVersion=" + lastSeenMetadataVersion + + '}'; + } + } } diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index 1e6d3ec892a..c34fed37516 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -1060,6 +1060,20 @@ } } } + }, + "auto_followed_clusters": { + "type": "nested", + "properties": { + "cluster_name": { + "type": "keyword" + }, + "time_since_last_check_millis": { + "type": "long" + }, + "last_seen_metadata_version": { + "type": "long" + } + } } } }