From 6ec36b127367acb4e7a0046024bd75583dbc6f43 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 24 Sep 2018 17:47:10 -0400 Subject: [PATCH] CCR: Make AutoFollowMetadata immutable (#33977) We should make AutoFollowMetadata immutable to avoid being inconsistent when one thread modifies it while other reads it. --- .../ccr/action/AutoFollowCoordinator.java | 23 ++++++++++--------- .../TransportPutAutoFollowPatternAction.java | 9 ++++---- .../action/AutoFollowCoordinatorTests.java | 4 ++-- .../xpack/core/ccr/AutoFollowMetadata.java | 18 +++++++++------ 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 51aa44105ec..b4292272079 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -368,18 +368,19 @@ public class AutoFollowCoordinator implements ClusterStateApplier { Index indexToFollow) { return currentState -> { AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); - - Map> newFollowedIndexUUIDS = - new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); - newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID()); - - ClusterState.Builder newState = ClusterState.builder(currentState); - AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), + Map> newFollowedIndexUUIDS = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + newFollowedIndexUUIDS.compute(clusterAlias, (key, existingUUIDs) -> { + assert existingUUIDs != null; + List newUUIDs = new ArrayList<>(existingUUIDs); + newUUIDs.add(indexToFollow.getUUID()); + return Collections.unmodifiableList(newUUIDs); + }); + final AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), newFollowedIndexUUIDS, currentAutoFollowMetadata.getHeaders()); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata) - .build()); - return newState.build(); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata).build()) + .build(); }; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 05ba40310a9..199b1215653 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -135,12 +135,13 @@ public class TransportPutAutoFollowPatternAction extends } AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias()); - List followedIndexUUIDs = followedLeaderIndices.get(request.getLeaderClusterAlias()); - if (followedIndexUUIDs == null) { + final List followedIndexUUIDs; + if (followedLeaderIndices.containsKey(request.getLeaderClusterAlias())) { + followedIndexUUIDs = new ArrayList<>(followedLeaderIndices.get(request.getLeaderClusterAlias())); + } else { followedIndexUUIDs = new ArrayList<>(); - followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs); } - + followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs); // Mark existing leader indices as already auto followed: if (previousPattern != null) { markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 8da5a75b6bb..82f31536039 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -85,7 +85,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { void getLeaderClusterState(Map headers, String leaderClusterAlias, BiConsumer handler) { - assertThat(headers, sameInstance(autoFollowHeaders.get("remote"))); + assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); handler.accept(leaderState, null); } @@ -94,7 +94,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { FollowIndexAction.Request followRequest, Runnable successHandler, Consumer failureHandler) { - assertThat(headers, sameInstance(autoFollowHeaders.get("remote"))); + assertThat(headers, equalTo(autoFollowHeaders.get("remote"))); assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101")); assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101")); successHandler.run(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index ed7a9fa5985..951320518b5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Custom metadata that contains auto follow patterns and what leader indices an auto follow pattern has already followed. @@ -79,16 +80,19 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i public AutoFollowMetadata(Map patterns, Map> followedLeaderIndexUUIDs, Map> headers) { - this.patterns = patterns; - this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs; - this.headers = Collections.unmodifiableMap(headers); + this.patterns = Collections.unmodifiableMap(patterns); + this.followedLeaderIndexUUIDs = Collections.unmodifiableMap(followedLeaderIndexUUIDs.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableList(e.getValue())))); + this.headers = Collections.unmodifiableMap(headers.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(e.getValue())))); } public AutoFollowMetadata(StreamInput in) throws IOException { - patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new); - followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString); - headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, - valIn -> Collections.unmodifiableMap(valIn.readMap(StreamInput::readString, StreamInput::readString)))); + this( + in.readMap(StreamInput::readString, AutoFollowPattern::new), + in.readMapOfLists(StreamInput::readString, StreamInput::readString), + in.readMap(StreamInput::readString, valIn -> valIn.readMap(StreamInput::readString, StreamInput::readString)) + ); } public Map getPatterns() {