diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index d2cd5cdd4d9..51aa44105ec 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -254,7 +254,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier { final String clusterAlias = entry.getKey(); final AutoFollowPattern autoFollowPattern = entry.getValue(); - getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> { + Map headers = autoFollowMetadata.getHeaders().get(clusterAlias); + getLeaderClusterState(headers, clusterAlias, (leaderClusterState, e) -> { if (leaderClusterState != null) { assert e == null; final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias); @@ -264,7 +265,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { finalise(slot, new AutoFollowResult(clusterAlias)); } else { Consumer resultHandler = result -> finalise(slot, result); - checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler); + checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, headers, resultHandler); } } else { finalise(slot, new AutoFollowResult(clusterAlias, e)); @@ -274,15 +275,18 @@ public class AutoFollowCoordinator implements ClusterStateApplier { } } - private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern, - List leaderIndicesToFollow, Consumer resultHandler) { + private void checkAutoFollowPattern(String clusterAlias, + AutoFollowPattern autoFollowPattern, + List leaderIndicesToFollow, + Map headers, + Consumer resultHandler) { final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); final AtomicArray> results = new AtomicArray<>(leaderIndicesToFollow.size()); for (int i = 0; i < leaderIndicesToFollow.size(); i++) { final Index indexToFollow = leaderIndicesToFollow.get(i); final int slot = i; - followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> { + followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, headers, error -> { results.set(slot, new Tuple<>(indexToFollow, error)); if (leaderIndicesCountDown.countDown()) { resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList())); @@ -291,8 +295,11 @@ public class AutoFollowCoordinator implements ClusterStateApplier { } } - private void followLeaderIndex(String clusterAlias, Index indexToFollow, - AutoFollowPattern pattern, Consumer onResult) { + private void followLeaderIndex(String clusterAlias, + Index indexToFollow, + AutoFollowPattern pattern, + Map headers, + Consumer onResult) { final String leaderIndexName = indexToFollow.getName(); final String followIndexName = getFollowerIndexName(pattern, leaderIndexName); @@ -319,7 +326,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier { // The coordinator always runs on the elected master node, so we can update cluster state here: updateAutoFollowMetadata(function, onResult); }; - createAndFollow(pattern.getHeaders(), request, successHandler, onResult); + createAndFollow(headers, request, successHandler, onResult); } private void finalise(int slot, AutoFollowResult result) { @@ -357,7 +364,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier { } } - static Function recordLeaderIndexAsFollowFunction(String clusterAlias, Index indexToFollow) { + static Function recordLeaderIndexAsFollowFunction(String clusterAlias, + Index indexToFollow) { return currentState -> { AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); @@ -366,8 +374,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier { newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID()); ClusterState.Builder newState = ClusterState.builder(currentState); - AutoFollowMetadata newAutoFollowMetadata = - new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS); + AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), + newFollowedIndexUUIDS, currentAutoFollowMetadata.getHeaders()); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) .build()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java index 8d2e59defd8..974bca98859 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java @@ -85,10 +85,12 @@ public class TransportDeleteAutoFollowPatternAction extends final Map patternsCopy = new HashMap<>(patterns); final Map> followedLeaderIndexUUIDSCopy = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + final Map> headers = new HashMap<>(currentAutoFollowMetadata.getHeaders()); patternsCopy.remove(request.getLeaderClusterAlias()); followedLeaderIndexUUIDSCopy.remove(request.getLeaderClusterAlias()); + headers.remove(request.getLeaderClusterAlias()); - AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy); + AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy, headers); ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 2f9dd02648d..05ba40310a9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -123,12 +123,15 @@ public class TransportPutAutoFollowPatternAction extends AutoFollowMetadata currentAutoFollowMetadata = localState.metaData().custom(AutoFollowMetadata.TYPE); Map> followedLeaderIndices; Map patterns; + Map> headers; if (currentAutoFollowMetadata != null) { patterns = new HashMap<>(currentAutoFollowMetadata.getPatterns()); followedLeaderIndices = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + headers = new HashMap<>(currentAutoFollowMetadata.getHeaders()); } else { patterns = new HashMap<>(); followedLeaderIndices = new HashMap<>(); + headers = new HashMap<>(); } AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias()); @@ -147,6 +150,10 @@ public class TransportPutAutoFollowPatternAction extends followedIndexUUIDs); } + if (filteredHeaders != null) { + headers.put(request.getLeaderClusterAlias(), filteredHeaders); + } + AutoFollowPattern autoFollowPattern = new AutoFollowPattern( request.getLeaderIndexPatterns(), request.getFollowIndexNamePattern(), @@ -156,12 +163,11 @@ public class TransportPutAutoFollowPatternAction extends request.getMaxConcurrentWriteBatches(), request.getMaxWriteBufferSize(), request.getMaxRetryDelay(), - request.getPollTimeout(), - filteredHeaders); + request.getPollTimeout()); patterns.put(request.getLeaderClusterAlias(), autoFollowPattern); ClusterState.Builder newState = ClusterState.builder(localState); newState.metaData(MetaData.builder(localState.getMetaData()) - .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices)) + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, followedLeaderIndices, headers)) .build()); return newState.build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index 5ef7b4093ae..99657792e92 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,6 +35,7 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase configs = new HashMap<>(numEntries); Map> followedLeaderIndices = new HashMap<>(numEntries); + Map> headers = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { List leaderPatterns = Arrays.asList(generateRandomStringArray(4, 4, false)); AutoFollowMetadata.AutoFollowPattern autoFollowPattern = new AutoFollowMetadata.AutoFollowPattern( @@ -47,12 +47,19 @@ public class AutoFollowMetadataTests extends AbstractSerializingTestCase header = new HashMap<>(); + for (int j = 0; j < numHeaderEntries; j++) { + header.put(randomAlphaOfLength(5), randomAlphaOfLength(5)); + } + headers.put(Integer.toString(i), header); + } } - return new AutoFollowMetadata(configs, followedLeaderIndices); + return new AutoFollowMetadata(configs, followedLeaderIndices, headers); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index a74b1e33cd2..434f524b255 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -140,11 +140,11 @@ public class CcrLicenseIT extends ESSingleNodeTestCase { @Override public ClusterState execute(ClusterState currentState) throws Exception { AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( Collections.singletonMap("test_alias", autoFollowPattern), - Collections.emptyMap() - ); + Collections.emptyMap(), + Collections.emptyMap()); ClusterState.Builder newState = ClusterState.builder(currentState); newState.metaData(MetaData.builder(currentState.getMetaData()) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 218825e4120..8da5a75b6bb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -56,12 +56,14 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); ClusterState currentState = ClusterState.builder(new ClusterName("name")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) @@ -83,6 +85,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { void getLeaderClusterState(Map headers, String leaderClusterAlias, BiConsumer handler) { + assertThat(headers, sameInstance(autoFollowHeaders.get("remote"))); handler.accept(leaderState, null); } @@ -91,6 +94,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { + assertThat(headers, sameInstance(autoFollowHeaders.get("remote"))); assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); @@ -115,12 +119,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + Map> headers = new HashMap<>(); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .build(); @@ -172,12 +177,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + Map> headers = new HashMap<>(); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .build(); @@ -233,12 +239,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .build(); AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); - AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS); + Map> headers = new HashMap<>(); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .build(); @@ -285,10 +292,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetLeaderIndicesToFollow() { AutoFollowPattern autoFollowPattern = - new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null); + new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null); + Map> headers = new HashMap<>(); ClusterState followerState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap()))) + new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) .build(); MetaData.Builder imdBuilder = MetaData.builder(); @@ -331,15 +339,15 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testGetFollowerIndexName() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), null, null, - null, null, null, null, null, null, null); + null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, - null, null, null, null, null, null); + null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); autoFollowPattern = new AutoFollowPattern(Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, - null, null, null, null, null, null, null); + null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java index 2525b63de31..e2280316264 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction.Req import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,30 +26,33 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { public void testInnerDelete() { Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); + Map> existingHeaders = new HashMap<>(); Map existingAutoFollowPatterns = new HashMap<>(); { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("eu_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS); + existingHeaders.put("eu_cluster", Collections.singletonMap("key", "val")); } { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); existingAutoFollowPatterns.put("asia_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); existingAlreadyFollowedIndexUUIDS.put("asia_cluster", existingUUIDS); + existingHeaders.put("asia_cluster", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS, existingHeaders))) .build(); Request request = new Request(); @@ -60,20 +64,24 @@ public class TransportDeleteAutoFollowPatternActionTests extends ESTestCase { assertThat(result.getPatterns().get("asia_cluster"), notNullValue()); assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); assertThat(result.getFollowedLeaderIndexUUIDs().get("asia_cluster"), notNullValue()); + assertThat(result.getHeaders().size(), equalTo(1)); + assertThat(result.getHeaders().get("asia_cluster"), notNullValue()); } public void testInnerDeleteDoesNotExist() { Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); Map existingAutoFollowPatterns = new HashMap<>(); + Map> existingHeaders = new HashMap<>(); { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("eu_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); + existingHeaders.put("key", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS, existingHeaders))) .build(); Request request = new Request(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index 5731a64ba89..c208a4b042d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -97,14 +97,17 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("eu_cluster", - new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null, null)); + new AutoFollowMetadata.AutoFollowPattern(existingPatterns, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); existingAlreadyFollowedIndexUUIDS.put("eu_cluster", existingUUIDS); + Map> existingHeaders = new HashMap<>(); + existingHeaders.put("eu_cluster", Collections.singletonMap("key", "val")); + ClusterState localState = ClusterState.builder(new ClusterName("us_cluster")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS))) + new AutoFollowMetadata(existingAutoFollowPatterns, existingAlreadyFollowedIndexUUIDS, existingHeaders))) .build(); int numLeaderIndices = randomIntBetween(1, 8); @@ -129,6 +132,8 @@ public class TransportPutAutoFollowPatternActionTests extends ESTestCase { assertThat(autoFollowMetadata.getPatterns().get("eu_cluster").getLeaderIndexPatterns().get(1), equalTo("transactions-*")); assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("eu_cluster").size(), equalTo(numLeaderIndices + 1)); + assertThat(autoFollowMetadata.getHeaders().size(), equalTo(1)); + assertThat(autoFollowMetadata.getHeaders().get("eu_cluster"), notNullValue()); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index 75832271bee..d4440068a57 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -21,10 +21,8 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.security.xcontent.XContentUtils; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -41,10 +39,15 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private static final ParseField PATTERNS_FIELD = new ParseField("patterns"); private static final ParseField FOLLOWED_LEADER_INDICES_FIELD = new ParseField("followed_leader_indices"); + private static final ParseField HEADERS = new ParseField("headers"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow", - args -> new AutoFollowMetadata((Map) args[0], (Map>) args[1])); + args -> new AutoFollowMetadata( + (Map) args[0], + (Map>) args[1], + (Map>) args[2] + )); static { PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { @@ -61,20 +64,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i } return patterns; }, PATTERNS_FIELD); - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { - Map> alreadyFollowedIndexUUIDS = new HashMap<>(); - String fieldName = null; - for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) { - if (token == XContentParser.Token.FIELD_NAME) { - fieldName = p.currentName(); - } else if (token == XContentParser.Token.START_ARRAY) { - alreadyFollowedIndexUUIDS.put(fieldName, Arrays.asList(XContentUtils.readStringArray(p, false))); - } else { - throw new ElasticsearchParseException("unexpected token [" + token + "]"); - } - } - return alreadyFollowedIndexUUIDS; - }, FOLLOWED_LEADER_INDICES_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), FOLLOWED_LEADER_INDICES_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), HEADERS); } public static AutoFollowMetadata fromXContent(XContentParser parser) throws IOException { @@ -83,15 +74,21 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private final Map patterns; private final Map> followedLeaderIndexUUIDs; + private final Map> headers; - public AutoFollowMetadata(Map patterns, Map> followedLeaderIndexUUIDs) { + public AutoFollowMetadata(Map patterns, + Map> followedLeaderIndexUUIDs, + Map> headers) { this.patterns = patterns; this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs; + this.headers = Collections.unmodifiableMap(headers); } public AutoFollowMetadata(StreamInput in) throws IOException { patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new); followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString); + headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, + valIn -> Collections.unmodifiableMap(valIn.readMap(StreamInput::readString, StreamInput::readString)))); } public Map getPatterns() { @@ -102,11 +99,14 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return followedLeaderIndexUUIDs; } + public Map> getHeaders() { + return headers; + } + @Override public EnumSet context() { - // TODO: When a snapshot is restored do we want to restore this? - // (Otherwise we would start following indices automatically immediately) - return MetaData.ALL_CONTEXTS; + // No XContentContext.API, because the headers should not be serialized as part of clusters state api + return EnumSet.of(MetaData.XContentContext.SNAPSHOT, MetaData.XContentContext.GATEWAY); } @Override @@ -123,6 +123,8 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i public void writeTo(StreamOutput out) throws IOException { out.writeMap(patterns, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); out.writeMapOfLists(followedLeaderIndexUUIDs, StreamOutput::writeString, StreamOutput::writeString); + out.writeMap(headers, StreamOutput::writeString, + (valOut, header) -> valOut.writeMap(header, StreamOutput::writeString, StreamOutput::writeString)); } @Override @@ -140,6 +142,11 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i builder.field(entry.getKey(), entry.getValue()); } builder.endObject(); + builder.startObject(HEADERS.getPreferredName()); + for (Map.Entry> entry : headers.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); return builder; } @@ -172,14 +179,12 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); - private static final ParseField HEADERS = new ParseField("headers"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", args -> new AutoFollowPattern((List) args[0], (String) args[1], (Integer) args[2], (Integer) args[3], - (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8], - (Map) args[9])); + (Long) args[4], (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8])); static { PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); @@ -195,7 +200,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), POLL_TIMEOUT, ObjectParser.ValueType.STRING); - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } private final List leaderIndexPatterns; @@ -207,7 +211,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i private final Integer maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue pollTimeout; - private final Map headers; public AutoFollowPattern(List leaderIndexPatterns, String followIndexPattern, @@ -217,8 +220,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i Integer maxConcurrentWriteBatches, Integer maxWriteBufferSize, TimeValue maxRetryDelay, - TimeValue pollTimeout, - Map headers) { + TimeValue pollTimeout) { this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; this.maxBatchOperationCount = maxBatchOperationCount; @@ -228,10 +230,9 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.pollTimeout = pollTimeout; - this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } - AutoFollowPattern(StreamInput in) throws IOException { + public AutoFollowPattern(StreamInput in) throws IOException { leaderIndexPatterns = in.readList(StreamInput::readString); followIndexPattern = in.readOptionalString(); maxBatchOperationCount = in.readOptionalVInt(); @@ -241,7 +242,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i maxWriteBufferSize = in.readOptionalVInt(); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); - this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public boolean match(String indexName) { @@ -288,10 +288,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i return pollTimeout; } - public Map getHeaders() { - return headers; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeStringList(leaderIndexPatterns); @@ -303,7 +299,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i out.writeOptionalVInt(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); - out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @Override @@ -333,7 +328,6 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i if (pollTimeout != null) { builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout); } - builder.field(HEADERS.getPreferredName(), headers); return builder; } @@ -355,8 +349,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) && Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && Objects.equals(maxRetryDelay, that.maxRetryDelay) && - Objects.equals(pollTimeout, that.pollTimeout) && - Objects.equals(headers, that.headers); + Objects.equals(pollTimeout, that.pollTimeout); } @Override @@ -370,8 +363,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i maxConcurrentWriteBatches, maxWriteBufferSize, maxRetryDelay, - pollTimeout, - headers + pollTimeout ); } }