From 7624734f14bc77acc299a448b108113a8694e5a3 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 26 Nov 2018 08:50:08 +0100 Subject: [PATCH] Added wait_for_metadata_version parameter to cluster state api. (#35535) The `wait_for_metadata_version` parameter will instruct the cluster state api to only return a cluster state until the metadata's version is equal or greater than the version specified in `wait_for_metadata_version`. If the specified `wait_for_timeout` has expired then a timed out response is returned. (a response with no cluster state and wait for timed out flag set to true) In the case metadata's version is equal or higher than `wait_for_metadata_version` then the api will immediately return. This feature is useful to avoid external components from constantly polling the cluster state to whether somethings have changed in the cluster state's metadata. --- ...rossClusterSearchUnavailableClusterIT.java | 2 +- .../rest-api-spec/api/cluster.state.json | 8 ++ .../cluster/state/ClusterStateRequest.java | 38 ++++++++ .../cluster/state/ClusterStateResponse.java | 88 +++++++++++++++++-- .../state/TransportClusterStateAction.java | 52 ++++++++++- .../admin/cluster/RestClusterStateAction.java | 8 ++ .../cluster/state/ClusterStateApiTests.java | 80 +++++++++++++++++ .../state/ClusterStateRequestTests.java | 23 +++++ .../state/ClusterStateResponseTests.java | 55 ++++++++++++ .../transport/FailAndRetryMockTransport.java | 2 +- .../TransportClientHeadersTests.java | 2 +- .../TransportClientNodesServiceTests.java | 2 +- .../rest/action/cat/RestNodesActionTests.java | 2 +- .../RemoteClusterConnectionTests.java | 2 +- 14 files changed, 351 insertions(+), 13 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateApiTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponseTests.java diff --git a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java index 0c42e4be89a..56764d44893 100644 --- a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java +++ b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java @@ -114,7 +114,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase { builder.add(node); } ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build(); - channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L)); + channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L, false)); }); newService.start(); newService.acceptIncomingRequests(); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.state.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.state.json index 0f4bf6f21ad..fec3f450004 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.state.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.state.json @@ -33,6 +33,14 @@ "type": "boolean", "description": "Return settings in flat format (default: false)" }, + "wait_for_metadata_version": { + "type": "number", + "description": "Wait for the metadata version to be equal or greater than the specified metadata version" + }, + "wait_for_timeout" : { + "type": "time", + "description": "The maximum time to wait for wait_for_metadata_version before timing out" + }, "ignore_unavailable": { "type" : "boolean", "description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)" diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java index 33a20332526..ae55c60ccd4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.state; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -26,16 +27,21 @@ import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; public class ClusterStateRequest extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1); + private boolean routingTable = true; private boolean nodes = true; private boolean metaData = true; private boolean blocks = true; private boolean customs = true; + private Long waitForMetaDataVersion; + private TimeValue waitForTimeout = DEFAULT_WAIT_FOR_NODE_TIMEOUT; private String[] indices = Strings.EMPTY_ARRAY; private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen(); @@ -51,6 +57,11 @@ public class ClusterStateRequest extends MasterNodeReadRequest= 1, but instead is [" + + waitForMetaDataVersion + "]"); + } + this.waitForMetaDataVersion = waitForMetaDataVersion; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java index 71ad6f9f834..412e8483ddc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java @@ -24,11 +24,13 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; import java.io.IOException; +import java.util.Objects; /** * The response for getting the cluster state. @@ -40,14 +42,16 @@ public class ClusterStateResponse extends ActionResponse { // the total compressed size of the full cluster state, not just // the parts included in this response private ByteSizeValue totalCompressedSize; + private boolean waitForTimedOut = false; public ClusterStateResponse() { } - public ClusterStateResponse(ClusterName clusterName, ClusterState clusterState, long sizeInBytes) { + public ClusterStateResponse(ClusterName clusterName, ClusterState clusterState, long sizeInBytes, boolean waitForTimedOut) { this.clusterName = clusterName; this.clusterState = clusterState; this.totalCompressedSize = new ByteSizeValue(sizeInBytes); + this.waitForTimedOut = waitForTimedOut; } /** @@ -75,11 +79,24 @@ public class ClusterStateResponse extends ActionResponse { return totalCompressedSize; } + /** + * Returns whether the request timed out waiting for a cluster state with a metadata version equal or + * higher than the specified metadata. + */ + public boolean isWaitForTimedOut() { + return waitForTimedOut; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); clusterName = new ClusterName(in); - clusterState = ClusterState.readFrom(in, null); + // TODO: change version to V_6_6_0 after backporting: + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + clusterState = in.readOptionalWriteable(innerIn -> ClusterState.readFrom(innerIn, null)); + } else { + clusterState = ClusterState.readFrom(in, null); + } if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { totalCompressedSize = new ByteSizeValue(in); } else { @@ -89,19 +106,80 @@ public class ClusterStateResponse extends ActionResponse { // at which point the correct cluster state size will always be reported totalCompressedSize = new ByteSizeValue(0L); } + // TODO: change version to V_6_6_0 after backporting: + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + waitForTimedOut = in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); clusterName.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_6_3_0)) { - clusterState.writeTo(out); + // TODO: change version to V_6_6_0 after backporting: + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalWriteable(clusterState); } else { - ClusterModule.filterCustomsForPre63Clients(clusterState).writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + clusterState.writeTo(out); + } else { + ClusterModule.filterCustomsForPre63Clients(clusterState).writeTo(out); + } } if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { totalCompressedSize.writeTo(out); } + // TODO: change version to V_6_6_0 after backporting: + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeBoolean(waitForTimedOut); + } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterStateResponse response = (ClusterStateResponse) o; + return waitForTimedOut == response.waitForTimedOut && + Objects.equals(clusterName, response.clusterName) && + // Best effort. Only compare cluster state version and master node id, + // because cluster state doesn't implement equals() + Objects.equals(getVersion(clusterState), getVersion(response.clusterState)) && + Objects.equals(getMasterNodeId(clusterState), getMasterNodeId(response.clusterState)) && + Objects.equals(totalCompressedSize, response.totalCompressedSize); + } + + @Override + public int hashCode() { + // Best effort. Only use cluster state version and master node id, + // because cluster state doesn't implement hashcode() + return Objects.hash( + clusterName, + getVersion(clusterState), + getMasterNodeId(clusterState), + totalCompressedSize, + waitForTimedOut + ); + } + + private static String getMasterNodeId(ClusterState clusterState) { + if (clusterState == null) { + return null; + } + DiscoveryNodes nodes = clusterState.getNodes(); + if (nodes != null) { + return nodes.getMasterNodeId(); + } else { + return null; + } + } + + private static Long getVersion(ClusterState clusterState) { + if (clusterState != null) { + return clusterState.getVersion(); + } else { + return null; + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index 743bec1886c..5d72b5a1bfb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -33,10 +34,13 @@ import org.elasticsearch.cluster.metadata.MetaData.Custom; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.function.Predicate; import static org.elasticsearch.discovery.zen.PublishClusterStateAction.serializeFullClusterState; @@ -74,7 +78,51 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction listener) throws IOException { - ClusterState currentState = clusterService.state(); + + if (request.waitForMetaDataVersion() != null) { + final Predicate metadataVersionPredicate = clusterState -> { + return clusterState.metaData().version() >= request.waitForMetaDataVersion(); + }; + final ClusterStateObserver observer = + new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext()); + final ClusterState clusterState = observer.setAndGetObservedState(); + if (metadataVersionPredicate.test(clusterState)) { + buildResponse(request, clusterState, listener); + } else { + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + try { + buildResponse(request, state, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + try { + listener.onResponse(new ClusterStateResponse(clusterState.getClusterName(), null, 0L, true)); + } catch (Exception e) { + listener.onFailure(e); + } + } + }, metadataVersionPredicate); + } + } else { + ClusterState currentState = clusterService.state(); + buildResponse(request, currentState, listener); + } + } + + private void buildResponse(final ClusterStateRequest request, + final ClusterState currentState, + final ActionListener listener) throws IOException { logger.trace("Serving cluster state request using version {}", currentState.version()); ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName()); builder.version(currentState.version()); @@ -133,7 +181,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction future1 = client().admin().cluster().state(clusterStateRequest); + assertThat(future1.isDone(), is(true)); + assertThat(future1.actionGet().isWaitForTimedOut(), is(false)); + long metadataVersion = future1.actionGet().getState().getMetaData().version(); + + // Verify that cluster state api returns after the cluster settings have been updated: + clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.waitForMetaDataVersion(metadataVersion + 1); + + ActionFuture future2 = client().admin().cluster().state(clusterStateRequest); + assertThat(future2.isDone(), is(false)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + // Pick an arbitrary dynamic cluster setting and change it. Just to get metadata version incremented: + updateSettingsRequest.transientSettings(Settings.builder().put("cluster.max_shards_per_node", 999)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + assertBusy(() -> { + assertThat(future2.isDone(), is(true)); + }); + ClusterStateResponse response = future2.actionGet(); + assertThat(response.isWaitForTimedOut(), is(false)); + assertThat(response.getState().metaData().version(), equalTo(metadataVersion + 1)); + + // Verify that the timed out property has been set" + metadataVersion = response.getState().getMetaData().version(); + clusterStateRequest.waitForMetaDataVersion(metadataVersion + 1); + clusterStateRequest.waitForTimeout(TimeValue.timeValueSeconds(1)); // Fail fast + ActionFuture future3 = client().admin().cluster().state(clusterStateRequest); + assertBusy(() -> { + assertThat(future3.isDone(), is(true)); + }); + response = future3.actionGet(); + assertThat(response.isWaitForTimedOut(), is(true)); + assertThat(response.getState(), nullValue()); + + // Remove transient setting, otherwise test fails with the reason that this test leaves state behind: + updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings(Settings.builder().put("cluster.max_shards_per_node", (String) null)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java index e2a07063d48..c4a414388cd 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -42,6 +43,16 @@ public class ClusterStateRequestTests extends ESTestCase { Version testVersion = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT); + // TODO: change version to V_6_6_0 after backporting: + if (testVersion.onOrAfter(Version.V_7_0_0)) { + if (randomBoolean()) { + clusterStateRequest.waitForMetaDataVersion(randomLongBetween(1, Long.MAX_VALUE)); + } + if (randomBoolean()) { + clusterStateRequest.waitForTimeout(new TimeValue(randomNonNegativeLong())); + } + } + BytesStreamOutput output = new BytesStreamOutput(); output.setVersion(testVersion); clusterStateRequest.writeTo(output); @@ -56,9 +67,21 @@ public class ClusterStateRequestTests extends ESTestCase { assertThat(deserializedCSRequest.blocks(), equalTo(clusterStateRequest.blocks())); assertThat(deserializedCSRequest.indices(), equalTo(clusterStateRequest.indices())); assertOptionsMatch(deserializedCSRequest.indicesOptions(), clusterStateRequest.indicesOptions()); + // TODO: change version to V_6_6_0 after backporting: + if (testVersion.onOrAfter(Version.V_7_0_0)) { + assertThat(deserializedCSRequest.waitForMetaDataVersion(), equalTo(clusterStateRequest.waitForMetaDataVersion())); + assertThat(deserializedCSRequest.waitForTimeout(), equalTo(clusterStateRequest.waitForTimeout())); + } } } + public void testWaitForMetaDataVersion() { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + expectThrows(IllegalArgumentException.class, + () -> clusterStateRequest.waitForMetaDataVersion(randomLongBetween(Long.MIN_VALUE, 0))); + clusterStateRequest.waitForMetaDataVersion(randomLongBetween(1, Long.MAX_VALUE)); + } + private static void assertOptionsMatch(IndicesOptions in, IndicesOptions out) { assertThat(in.ignoreUnavailable(), equalTo(out.ignoreUnavailable())); assertThat(in.expandWildcardsClosed(), equalTo(out.expandWildcardsClosed())); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponseTests.java new file mode 100644 index 00000000000..19dec5bcc59 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponseTests.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.state; + +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.test.AbstractStreamableTestCase; + +public class ClusterStateResponseTests extends AbstractStreamableTestCase { + + @Override + protected ClusterStateResponse createBlankInstance() { + return new ClusterStateResponse(); + } + + @Override + protected ClusterStateResponse createTestInstance() { + ClusterName clusterName = new ClusterName(randomAlphaOfLength(4)); + ClusterState clusterState = null; + if (randomBoolean()) { + ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterName) + .version(randomNonNegativeLong()); + if (randomBoolean()) { + clusterStateBuilder.nodes(DiscoveryNodes.builder().masterNodeId(randomAlphaOfLength(4)).build()); + } + clusterState = clusterStateBuilder.build(); + } + return new ClusterStateResponse(clusterName, clusterState, randomNonNegativeLong(), randomBoolean()); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + } +} diff --git a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index e9ff4048bfe..2e2b29e4478 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -99,7 +99,7 @@ abstract class FailAndRetryMockTransport imp } else if (ClusterStateAction.NAME.equals(action)) { TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); ClusterState clusterState = getMockClusterState(node); - transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState, 0L)); + transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState, 0L, false)); } else if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener); Version version = node.getVersion(); diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index 82806938a0b..3efd447c468 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -175,7 +175,7 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT))); ((TransportResponseHandler) handler) - .handleResponse(new ClusterStateResponse(cluster1, builder.build(), 0L)); + .handleResponse(new ClusterStateResponse(cluster1, builder.build(), 0L, false)); clusterStateLatch.countDown(); } else if (TransportService.HANDSHAKE_ACTION_NAME .equals(action)) { ((TransportResponseHandler) handler).handleResponse( diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 41d691c95bd..0100962219d 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -416,7 +416,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { } DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(transportService.getLocalDiscoNode()).build(); ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); - channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L)); + channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, 0L, false)); } void blockRequest() { diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestNodesActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestNodesActionTests.java index bf3d40af5e0..7a35177c8ad 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestNodesActionTests.java @@ -60,7 +60,7 @@ public class RestNodesActionTests extends ESTestCase { ClusterState clusterState = mock(ClusterState.class); when(clusterState.nodes()).thenReturn(discoveryNodes); - ClusterStateResponse clusterStateResponse = new ClusterStateResponse(clusterName, clusterState, randomNonNegativeLong()); + ClusterStateResponse clusterStateResponse = new ClusterStateResponse(clusterName, clusterState, randomNonNegativeLong(), false); NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(clusterName, Collections.emptyList(), Collections.emptyList()); NodesStatsResponse nodesStatsResponse = new NodesStatsResponse(clusterName, Collections.emptyList(), Collections.emptyList()); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index b6ba79da67b..e4d41d2e120 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -136,7 +136,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { builder.add(node); } ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build(); - channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L)); + channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L, false)); }); newService.start(); newService.acceptIncomingRequests();