Add remote info to the HLRC (#50483)
The additional change to the original PR (#49657), is that `org.elasticsearch.client.cluster.RemoteConnectionInfo` now parses the initial_connect_timeout field as a string instead of a TimeValue instance. The reason that this is needed is because that the initial_connect_timeout field in the remote connection api is serialized for human consumption, but not for parsing purposes. Therefore the HLRC can't parse it correctly (which caused test failures in CI, but not in the PR CI :( ). The way this field is serialized needs to be changed in the remote connection api, but that is a breaking change. We should wait making this change until rest api versioning is introduced. Co-Authored-By: j-bean <anton.shuvaev91@gmail.com> Co-authored-by: j-bean <anton.shuvaev91@gmail.com>
This commit is contained in:
parent
33204c2055
commit
10ed1ae1d2
|
@ -26,6 +26,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest
|
|||
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoRequest;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -138,4 +140,33 @@ public final class ClusterClient {
|
|||
return restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, ClusterRequestConverters::clusterHealth, options,
|
||||
ClusterHealthResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the remote cluster information using the Remote cluster info API.
|
||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-remote-info.html"> Remote cluster info
|
||||
* API on elastic.co</a>
|
||||
* @param request the request
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
* @return the response
|
||||
* @throws IOException in case there is a problem sending the request or parsing back the response
|
||||
*/
|
||||
public RemoteInfoResponse remoteInfo(RemoteInfoRequest request, RequestOptions options) throws IOException {
|
||||
return restHighLevelClient.performRequestAndParseEntity(request, ClusterRequestConverters::remoteInfo, options,
|
||||
RemoteInfoResponse::fromXContent, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously get remote cluster information using the Remote cluster info API.
|
||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-remote-info.html"> Remote cluster info
|
||||
* API on elastic.co</a>
|
||||
* @param request the request
|
||||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
|
||||
* @param listener the listener to be notified upon request completion
|
||||
* @return cancellable that may be used to cancel the request
|
||||
*/
|
||||
public Cancellable remoteInfoAsync(RemoteInfoRequest request, RequestOptions options,
|
||||
ActionListener<RemoteInfoResponse> listener) {
|
||||
return restHighLevelClient.performRequestAsyncAndParseEntity(request, ClusterRequestConverters::remoteInfo, options,
|
||||
RemoteInfoResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
|||
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoRequest;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -76,4 +77,8 @@ final class ClusterRequestConverters {
|
|||
request.addParameters(params.asMap());
|
||||
return request;
|
||||
}
|
||||
|
||||
static Request remoteInfo(RemoteInfoRequest remoteInfoRequest) {
|
||||
return new Request(HttpGet.METHOD_NAME, "/_remote/info");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.cluster;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo {
|
||||
static final String NAME = "proxy";
|
||||
static final String ADDRESS = "address";
|
||||
static final String NUM_SOCKETS_CONNECTED = "num_sockets_connected";
|
||||
static final String MAX_SOCKET_CONNECTIONS = "max_socket_connections";
|
||||
private final String address;
|
||||
private final int maxSocketConnections;
|
||||
private final int numSocketsConnected;
|
||||
|
||||
ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) {
|
||||
this.address = address;
|
||||
this.maxSocketConnections = maxSocketConnections;
|
||||
this.numSocketsConnected = numSocketsConnected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return numSocketsConnected > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String modeName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
public String getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public int getMaxSocketConnections() {
|
||||
return maxSocketConnections;
|
||||
}
|
||||
|
||||
public int getNumSocketsConnected() {
|
||||
return numSocketsConnected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ProxyModeInfo otherProxy = (ProxyModeInfo) o;
|
||||
return maxSocketConnections == otherProxy.maxSocketConnections &&
|
||||
numSocketsConnected == otherProxy.numSocketsConnected &&
|
||||
Objects.equals(address, otherProxy.address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(address, maxSocketConnections, numSocketsConnected);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.cluster;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
|
||||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
|
||||
|
||||
/**
|
||||
* This class encapsulates all remote cluster information to be rendered on
|
||||
* {@code _remote/info} requests.
|
||||
*/
|
||||
public final class RemoteConnectionInfo {
|
||||
private static final String CONNECTED = "connected";
|
||||
private static final String MODE = "mode";
|
||||
private static final String INITIAL_CONNECT_TIMEOUT = "initial_connect_timeout";
|
||||
private static final String SKIP_UNAVAILABLE = "skip_unavailable";
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<RemoteConnectionInfo, String> PARSER = new ConstructingObjectParser<>(
|
||||
"RemoteConnectionInfoObjectParser",
|
||||
false,
|
||||
(args, clusterAlias) -> {
|
||||
String mode = (String) args[1];
|
||||
ModeInfo modeInfo;
|
||||
if (mode.equals(ProxyModeInfo.NAME)) {
|
||||
modeInfo = new ProxyModeInfo((String) args[4], (int) args[5], (int) args[6]);
|
||||
} else if (mode.equals(SniffModeInfo.NAME)) {
|
||||
modeInfo = new SniffModeInfo((List<String>) args[7], (int) args[8], (int) args[9]);
|
||||
} else {
|
||||
throw new IllegalArgumentException("mode cannot be " + mode);
|
||||
}
|
||||
return new RemoteConnectionInfo(clusterAlias,
|
||||
modeInfo,
|
||||
(String) args[2],
|
||||
(boolean) args[3]);
|
||||
});
|
||||
|
||||
static {
|
||||
PARSER.declareBoolean(constructorArg(), new ParseField(CONNECTED));
|
||||
PARSER.declareString(constructorArg(), new ParseField(MODE));
|
||||
PARSER.declareString(constructorArg(), new ParseField(INITIAL_CONNECT_TIMEOUT));
|
||||
PARSER.declareBoolean(constructorArg(), new ParseField(SKIP_UNAVAILABLE));
|
||||
|
||||
PARSER.declareString(optionalConstructorArg(), new ParseField(ProxyModeInfo.ADDRESS));
|
||||
PARSER.declareInt(optionalConstructorArg(), new ParseField(ProxyModeInfo.MAX_SOCKET_CONNECTIONS));
|
||||
PARSER.declareInt(optionalConstructorArg(), new ParseField(ProxyModeInfo.NUM_SOCKETS_CONNECTED));
|
||||
|
||||
PARSER.declareStringArray(optionalConstructorArg(), new ParseField(SniffModeInfo.SEEDS));
|
||||
PARSER.declareInt(optionalConstructorArg(), new ParseField(SniffModeInfo.MAX_CONNECTIONS_PER_CLUSTER));
|
||||
PARSER.declareInt(optionalConstructorArg(), new ParseField(SniffModeInfo.NUM_NODES_CONNECTED));
|
||||
}
|
||||
|
||||
private final ModeInfo modeInfo;
|
||||
// TODO: deprecate and remove this field in favor of initialConnectionTimeout field that is of type TimeValue.
|
||||
// When rest api versioning exists then change org.elasticsearch.transport.RemoteConnectionInfo to properly serialize
|
||||
// the initialConnectionTimeout field so that we can properly parse initialConnectionTimeout as TimeValue
|
||||
private final String initialConnectionTimeoutString;
|
||||
private final String clusterAlias;
|
||||
private final boolean skipUnavailable;
|
||||
|
||||
RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, String initialConnectionTimeoutString, boolean skipUnavailable) {
|
||||
this.clusterAlias = clusterAlias;
|
||||
this.modeInfo = modeInfo;
|
||||
this.initialConnectionTimeoutString = initialConnectionTimeoutString;
|
||||
this.skipUnavailable = skipUnavailable;
|
||||
}
|
||||
|
||||
public boolean isConnected() {
|
||||
return modeInfo.isConnected();
|
||||
}
|
||||
|
||||
public String getClusterAlias() {
|
||||
return clusterAlias;
|
||||
}
|
||||
|
||||
public ModeInfo getModeInfo() {
|
||||
return modeInfo;
|
||||
}
|
||||
|
||||
public String getInitialConnectionTimeoutString() {
|
||||
return initialConnectionTimeoutString;
|
||||
}
|
||||
|
||||
public boolean isSkipUnavailable() {
|
||||
return skipUnavailable;
|
||||
}
|
||||
|
||||
public static RemoteConnectionInfo fromXContent(XContentParser parser, String clusterAlias) throws IOException {
|
||||
return PARSER.parse(parser, clusterAlias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
RemoteConnectionInfo that = (RemoteConnectionInfo) o;
|
||||
return skipUnavailable == that.skipUnavailable &&
|
||||
Objects.equals(modeInfo, that.modeInfo) &&
|
||||
Objects.equals(initialConnectionTimeoutString, that.initialConnectionTimeoutString) &&
|
||||
Objects.equals(clusterAlias, that.clusterAlias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(modeInfo, initialConnectionTimeoutString, clusterAlias, skipUnavailable);
|
||||
}
|
||||
|
||||
public interface ModeInfo {
|
||||
|
||||
boolean isConnected();
|
||||
|
||||
String modeName();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.client.cluster;
|
||||
|
||||
import org.elasticsearch.client.Validatable;
|
||||
|
||||
/**
|
||||
* The request object used by the Remote cluster info API.
|
||||
*/
|
||||
public final class RemoteInfoRequest implements Validatable {
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.client.cluster;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
|
||||
|
||||
/**
|
||||
* A response to _remote/info API request.
|
||||
*/
|
||||
public final class RemoteInfoResponse {
|
||||
|
||||
private List<RemoteConnectionInfo> infos;
|
||||
|
||||
RemoteInfoResponse(List<RemoteConnectionInfo> infos) {
|
||||
this.infos = Collections.unmodifiableList(infos);
|
||||
}
|
||||
|
||||
public List<RemoteConnectionInfo> getInfos() {
|
||||
return infos;
|
||||
}
|
||||
|
||||
public static RemoteInfoResponse fromXContent(XContentParser parser) throws IOException {
|
||||
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
|
||||
|
||||
List<RemoteConnectionInfo> infos = new ArrayList<>();
|
||||
|
||||
XContentParser.Token token;
|
||||
while ((token = parser.nextToken()) == XContentParser.Token.FIELD_NAME) {
|
||||
String clusterAlias = parser.currentName();
|
||||
RemoteConnectionInfo info = RemoteConnectionInfo.fromXContent(parser, clusterAlias);
|
||||
infos.add(info);
|
||||
}
|
||||
ensureExpectedToken(XContentParser.Token.END_OBJECT, token, parser::getTokenLocation);
|
||||
return new RemoteInfoResponse(infos);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.cluster;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class SniffModeInfo implements RemoteConnectionInfo.ModeInfo {
|
||||
public static final String NAME = "sniff";
|
||||
static final String SEEDS = "seeds";
|
||||
static final String NUM_NODES_CONNECTED = "num_nodes_connected";
|
||||
static final String MAX_CONNECTIONS_PER_CLUSTER = "max_connections_per_cluster";
|
||||
final List<String> seedNodes;
|
||||
final int maxConnectionsPerCluster;
|
||||
final int numNodesConnected;
|
||||
|
||||
SniffModeInfo(List<String> seedNodes, int maxConnectionsPerCluster, int numNodesConnected) {
|
||||
this.seedNodes = seedNodes;
|
||||
this.maxConnectionsPerCluster = maxConnectionsPerCluster;
|
||||
this.numNodesConnected = numNodesConnected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return numNodesConnected > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String modeName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
public List<String> getSeedNodes() {
|
||||
return seedNodes;
|
||||
}
|
||||
|
||||
public int getMaxConnectionsPerCluster() {
|
||||
return maxConnectionsPerCluster;
|
||||
}
|
||||
|
||||
public int getNumNodesConnected() {
|
||||
return numNodesConnected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
SniffModeInfo sniff = (SniffModeInfo) o;
|
||||
return maxConnectionsPerCluster == sniff.maxConnectionsPerCluster &&
|
||||
numNodesConnected == sniff.numNodesConnected &&
|
||||
Objects.equals(seedNodes, sniff.seedNodes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected);
|
||||
}
|
||||
}
|
|
@ -19,10 +19,7 @@
|
|||
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -51,9 +48,7 @@ import org.elasticsearch.client.core.BroadcastResponse;
|
|||
import org.elasticsearch.client.indices.CloseIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexResponse;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.seqno.ReplicationTracker;
|
||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||
import org.junit.Before;
|
||||
|
@ -74,27 +69,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
|
|||
|
||||
@Before
|
||||
public void setupRemoteClusterConfig() throws Exception {
|
||||
// Configure local cluster as remote cluster:
|
||||
// TODO: replace with nodes info highlevel rest client code when it is available:
|
||||
final Request request = new Request("GET", "/_nodes");
|
||||
Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
|
||||
// Select node info of first node (we don't know the node id):
|
||||
nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
|
||||
String transportAddress = (String) nodesResponse.get("transport_address");
|
||||
|
||||
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
||||
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local_cluster.seeds", transportAddress));
|
||||
ClusterUpdateSettingsResponse updateSettingsResponse =
|
||||
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
|
||||
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
|
||||
|
||||
assertBusy(() -> {
|
||||
Map<?, ?> localConnection = (Map<?, ?>) toMap(client()
|
||||
.performRequest(new Request("GET", "/_remote/info")))
|
||||
.get("local_cluster");
|
||||
assertThat(localConnection, notNullValue());
|
||||
assertThat(localConnection.get("connected"), is(true));
|
||||
});
|
||||
setupRemoteClusterConfig("local_cluster");
|
||||
}
|
||||
|
||||
public void testIndexFollowing() throws Exception {
|
||||
|
@ -311,8 +286,4 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
|
|||
assertThat(pauseFollowResponse.isAcknowledged(), is(true));
|
||||
}
|
||||
|
||||
private static Map<String, Object> toMap(Response response) throws IOException {
|
||||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,19 +27,27 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest
|
|||
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.client.cluster.RemoteConnectionInfo;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoRequest;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoResponse;
|
||||
import org.elasticsearch.client.cluster.SniffModeInfo;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.health.ClusterIndexHealth;
|
||||
import org.elasticsearch.cluster.health.ClusterShardHealth;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.RemoteClusterService;
|
||||
import org.elasticsearch.transport.SniffConnectionStrategy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
|
@ -297,4 +305,41 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
|
|||
assertNoIndices(response);
|
||||
}
|
||||
|
||||
public void testRemoteInfo() throws Exception {
|
||||
String clusterAlias = "local_cluster";
|
||||
setupRemoteClusterConfig(clusterAlias);
|
||||
|
||||
ClusterGetSettingsRequest settingsRequest = new ClusterGetSettingsRequest();
|
||||
settingsRequest.includeDefaults(true);
|
||||
ClusterGetSettingsResponse settingsResponse = highLevelClient().cluster().getSettings(settingsRequest, RequestOptions.DEFAULT);
|
||||
|
||||
List<String> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS
|
||||
.getConcreteSettingForNamespace(clusterAlias)
|
||||
.get(settingsResponse.getTransientSettings());
|
||||
int connectionsPerCluster = SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER
|
||||
.get(settingsResponse.getTransientSettings());
|
||||
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING
|
||||
.get(settingsResponse.getTransientSettings());
|
||||
boolean skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
|
||||
.getConcreteSettingForNamespace(clusterAlias)
|
||||
.get(settingsResponse.getTransientSettings());
|
||||
|
||||
RemoteInfoRequest request = new RemoteInfoRequest();
|
||||
RemoteInfoResponse response = execute(request, highLevelClient().cluster()::remoteInfo,
|
||||
highLevelClient().cluster()::remoteInfoAsync);
|
||||
|
||||
assertThat(response, notNullValue());
|
||||
assertThat(response.getInfos().size(), equalTo(1));
|
||||
RemoteConnectionInfo info = response.getInfos().get(0);
|
||||
assertThat(info.getClusterAlias(), equalTo(clusterAlias));
|
||||
assertThat(info.getInitialConnectionTimeoutString(), equalTo(initialConnectionTimeout.toString()));
|
||||
assertThat(info.isSkipUnavailable(), equalTo(skipUnavailable));
|
||||
assertThat(info.getModeInfo().modeName(), equalTo(SniffModeInfo.NAME));
|
||||
assertThat(info.getModeInfo().isConnected(), equalTo(true));
|
||||
SniffModeInfo sniffModeInfo = (SniffModeInfo) info.getModeInfo();
|
||||
assertThat(sniffModeInfo.getMaxConnectionsPerCluster(), equalTo(connectionsPerCluster));
|
||||
assertThat(sniffModeInfo.getNumNodesConnected(), equalTo(1));
|
||||
assertThat(sniffModeInfo.getSeedNodes(), equalTo(seeds));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest
|
|||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoRequest;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -37,6 +38,7 @@ import java.util.HashMap;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
@ -147,4 +149,12 @@ public class ClusterRequestConvertersTests extends ESTestCase {
|
|||
}
|
||||
Assert.assertThat(request.getParameters(), equalTo(expectedParams));
|
||||
}
|
||||
|
||||
public void testRemoteInfo() {
|
||||
RemoteInfoRequest request = new RemoteInfoRequest();
|
||||
Request expectedRequest = ClusterRequestConverters.remoteInfo(request);
|
||||
assertEquals("/_remote/info", expectedRequest.getEndpoint());
|
||||
assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
|
||||
assertEquals(emptyMap(), expectedRequest.getParameters());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,19 +19,25 @@
|
|||
|
||||
package org.elasticsearch.client;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoRequest;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoResponse;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
@ -45,10 +51,15 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
|
||||
|
||||
|
@ -243,4 +254,30 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
|
|||
);
|
||||
highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
protected static void setupRemoteClusterConfig(String remoteClusterName) throws Exception {
|
||||
// Configure local cluster as remote cluster:
|
||||
// TODO: replace with nodes info highlevel rest client code when it is available:
|
||||
final Request request = new Request("GET", "/_nodes");
|
||||
Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
|
||||
// Select node info of first node (we don't know the node id):
|
||||
nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
|
||||
String transportAddress = (String) nodesResponse.get("transport_address");
|
||||
|
||||
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
||||
updateSettingsRequest.transientSettings(singletonMap("cluster.remote." + remoteClusterName + ".seeds", transportAddress));
|
||||
ClusterUpdateSettingsResponse updateSettingsResponse =
|
||||
restHighLevelClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
|
||||
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
|
||||
|
||||
assertBusy(() -> {
|
||||
RemoteInfoResponse response = highLevelClient().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT);
|
||||
assertThat(response, notNullValue());
|
||||
assertThat(response.getInfos().size(), greaterThan(0));
|
||||
});
|
||||
}
|
||||
|
||||
protected static Map<String, Object> toMap(Response response) throws IOException {
|
||||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,8 +59,8 @@ import org.elasticsearch.client.ml.dataframe.DataFrameAnalysis;
|
|||
import org.elasticsearch.client.ml.dataframe.OutlierDetection;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.classification.AccuracyMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.classification.Classification;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredErrorMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.classification.MulticlassConfusionMatrixMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.regression.MeanSquaredErrorMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.regression.RSquaredMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.regression.Regression;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.AucRocMetric;
|
||||
|
@ -68,14 +68,14 @@ import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.Binar
|
|||
import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.ConfusionMatrixMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.PrecisionMetric;
|
||||
import org.elasticsearch.client.ml.dataframe.evaluation.softclassification.RecallMetric;
|
||||
import org.elasticsearch.client.ml.inference.preprocessing.FrequencyEncoding;
|
||||
import org.elasticsearch.client.ml.inference.preprocessing.OneHotEncoding;
|
||||
import org.elasticsearch.client.ml.inference.preprocessing.TargetMeanEncoding;
|
||||
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.Ensemble;
|
||||
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.LogisticRegression;
|
||||
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.WeightedMode;
|
||||
import org.elasticsearch.client.ml.inference.trainedmodel.ensemble.WeightedSum;
|
||||
import org.elasticsearch.client.ml.inference.trainedmodel.tree.Tree;
|
||||
import org.elasticsearch.client.ml.inference.preprocessing.FrequencyEncoding;
|
||||
import org.elasticsearch.client.ml.inference.preprocessing.OneHotEncoding;
|
||||
import org.elasticsearch.client.ml.inference.preprocessing.TargetMeanEncoding;
|
||||
import org.elasticsearch.client.transform.transforms.SyncConfig;
|
||||
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
|
||||
import org.elasticsearch.common.CheckedFunction;
|
||||
|
@ -106,7 +106,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.test.InternalAggregationTestCase;
|
||||
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestApi;
|
||||
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
|
||||
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -773,7 +772,6 @@ public class RestHighLevelClientTests extends ESTestCase {
|
|||
public void testApiNamingConventions() throws Exception {
|
||||
//this list should be empty once the high-level client is feature complete
|
||||
String[] notYetSupportedApi = new String[]{
|
||||
"cluster.remote_info",
|
||||
"create",
|
||||
"get_script_context",
|
||||
"get_script_languages",
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.cluster;
|
||||
|
||||
import org.elasticsearch.client.AbstractResponseTestCase;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.transport.ProxyConnectionStrategy;
|
||||
import org.elasticsearch.transport.RemoteConnectionInfo;
|
||||
import org.elasticsearch.transport.SniffConnectionStrategy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.function.Function.identity;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class RemoteInfoResponseTests extends AbstractResponseTestCase<org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse,
|
||||
RemoteInfoResponse> {
|
||||
|
||||
@Override
|
||||
protected org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse createServerTestInstance(XContentType xContentType) {
|
||||
int numRemoteInfos = randomIntBetween(0, 8);
|
||||
List<RemoteConnectionInfo> remoteInfos = new ArrayList<>();
|
||||
for (int i = 0; i < numRemoteInfos; i++) {
|
||||
remoteInfos.add(createRandomRemoteConnectionInfo());
|
||||
}
|
||||
return new org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse(remoteInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RemoteInfoResponse doParseToClientInstance(XContentParser parser) throws IOException {
|
||||
return RemoteInfoResponse.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertInstances(org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse serverTestInstance,
|
||||
RemoteInfoResponse clientInstance) {
|
||||
assertThat(clientInstance.getInfos().size(), equalTo(serverTestInstance.getInfos().size()));
|
||||
Map<String, RemoteConnectionInfo> serverInfos = serverTestInstance.getInfos().stream()
|
||||
.collect(toMap(RemoteConnectionInfo::getClusterAlias, identity()));
|
||||
for (org.elasticsearch.client.cluster.RemoteConnectionInfo clientRemoteInfo : clientInstance.getInfos()) {
|
||||
RemoteConnectionInfo serverRemoteInfo = serverInfos.get(clientRemoteInfo.getClusterAlias());
|
||||
assertThat(clientRemoteInfo.getClusterAlias(), equalTo(serverRemoteInfo.getClusterAlias()));
|
||||
assertThat(clientRemoteInfo.getInitialConnectionTimeoutString(),
|
||||
equalTo(serverRemoteInfo.getInitialConnectionTimeout().toString()));
|
||||
assertThat(clientRemoteInfo.isConnected(), equalTo(serverRemoteInfo.isConnected()));
|
||||
assertThat(clientRemoteInfo.isSkipUnavailable(), equalTo(serverRemoteInfo.isSkipUnavailable()));
|
||||
assertThat(clientRemoteInfo.getModeInfo().isConnected(), equalTo(serverRemoteInfo.getModeInfo().isConnected()));
|
||||
assertThat(clientRemoteInfo.getModeInfo().modeName(), equalTo(serverRemoteInfo.getModeInfo().modeName()));
|
||||
if (clientRemoteInfo.getModeInfo().modeName().equals(SniffModeInfo.NAME)) {
|
||||
SniffModeInfo clientModeInfo =
|
||||
(SniffModeInfo) clientRemoteInfo.getModeInfo();
|
||||
SniffConnectionStrategy.SniffModeInfo serverModeInfo =
|
||||
(SniffConnectionStrategy.SniffModeInfo) serverRemoteInfo.getModeInfo();
|
||||
assertThat(clientModeInfo.getMaxConnectionsPerCluster(), equalTo(serverModeInfo.getMaxConnectionsPerCluster()));
|
||||
assertThat(clientModeInfo.getNumNodesConnected(), equalTo(serverModeInfo.getNumNodesConnected()));
|
||||
assertThat(clientModeInfo.getSeedNodes(), equalTo(serverModeInfo.getSeedNodes()));
|
||||
} else if (clientRemoteInfo.getModeInfo().modeName().equals(ProxyModeInfo.NAME)) {
|
||||
ProxyModeInfo clientModeInfo =
|
||||
(ProxyModeInfo) clientRemoteInfo.getModeInfo();
|
||||
ProxyConnectionStrategy.ProxyModeInfo serverModeInfo =
|
||||
(ProxyConnectionStrategy.ProxyModeInfo) serverRemoteInfo.getModeInfo();
|
||||
assertThat(clientModeInfo.getAddress(), equalTo(serverModeInfo.getAddress()));
|
||||
assertThat(clientModeInfo.getMaxSocketConnections(), equalTo(serverModeInfo.getMaxSocketConnections()));
|
||||
assertThat(clientModeInfo.getNumSocketsConnected(), equalTo(serverModeInfo.getNumSocketsConnected()));
|
||||
} else {
|
||||
fail("impossible case");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static RemoteConnectionInfo createRandomRemoteConnectionInfo() {
|
||||
RemoteConnectionInfo.ModeInfo modeInfo;
|
||||
if (randomBoolean()) {
|
||||
String address = randomAlphaOfLength(8);
|
||||
int maxSocketConnections = randomInt(5);
|
||||
int numSocketsConnected = randomInt(5);
|
||||
modeInfo = new ProxyConnectionStrategy.ProxyModeInfo(address, maxSocketConnections, numSocketsConnected);
|
||||
} else {
|
||||
List<String> seedNodes = randomList(randomInt(8), () -> randomAlphaOfLength(8));
|
||||
int maxConnectionsPerCluster = randomInt(5);
|
||||
int numNodesConnected = randomInt(5);
|
||||
modeInfo = new SniffConnectionStrategy.SniffModeInfo(seedNodes, maxConnectionsPerCluster, numNodesConnected);
|
||||
}
|
||||
String clusterAlias = randomAlphaOfLength(8);
|
||||
TimeValue initialConnectionTimeout = TimeValue.parseTimeValue(randomTimeValue(), "randomInitialConnectionTimeout");
|
||||
boolean skipUnavailable = randomBoolean();
|
||||
return new RemoteConnectionInfo(clusterAlias, modeInfo, initialConnectionTimeout, skipUnavailable);
|
||||
}
|
||||
}
|
|
@ -19,11 +19,8 @@
|
|||
|
||||
package org.elasticsearch.client.documentation;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
|
||||
|
@ -57,8 +54,6 @@ import org.elasticsearch.client.core.BroadcastResponse;
|
|||
import org.elasticsearch.client.indices.CloseIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexResponse;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||
import org.junit.Before;
|
||||
|
||||
|
@ -76,21 +71,8 @@ import static org.hamcrest.Matchers.is;
|
|||
public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||
|
||||
@Before
|
||||
public void setupRemoteClusterConfig() throws IOException {
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
// Configure local cluster as remote cluster:
|
||||
// TODO: replace with nodes info highlevel rest client code when it is available:
|
||||
final Request request = new Request("GET", "/_nodes");
|
||||
Map<?, ?> nodesResponse = (Map<?, ?>) toMap(client().performRequest(request)).get("nodes");
|
||||
// Select node info of first node (we don't know the node id):
|
||||
nodesResponse = (Map<?, ?>) nodesResponse.get(nodesResponse.keySet().iterator().next());
|
||||
String transportAddress = (String) nodesResponse.get("transport_address");
|
||||
|
||||
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
|
||||
updateSettingsRequest.transientSettings(Collections.singletonMap("cluster.remote.local.seeds", transportAddress));
|
||||
ClusterUpdateSettingsResponse updateSettingsResponse =
|
||||
client.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
|
||||
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
|
||||
public void setupRemoteClusterConfig() throws Exception {
|
||||
setupRemoteClusterConfig("local");
|
||||
}
|
||||
|
||||
public void testPutFollow() throws Exception {
|
||||
|
@ -987,8 +969,4 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
static Map<String, Object> toMap(Response response) throws IOException {
|
||||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -31,6 +31,9 @@ import org.elasticsearch.action.support.ActiveShardCount;
|
|||
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.client.cluster.RemoteConnectionInfo;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoRequest;
|
||||
import org.elasticsearch.client.cluster.RemoteInfoResponse;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.health.ClusterIndexHealth;
|
||||
|
@ -46,6 +49,7 @@ import org.elasticsearch.rest.RestStatus;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -415,4 +419,60 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
|
|||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
public void testRemoteInfo() throws Exception {
|
||||
setupRemoteClusterConfig("local_cluster");
|
||||
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
|
||||
// tag::remote-info-request
|
||||
RemoteInfoRequest request = new RemoteInfoRequest();
|
||||
// end::remote-info-request
|
||||
|
||||
// tag::remote-info-execute
|
||||
RemoteInfoResponse response = client.cluster().remoteInfo(request, RequestOptions.DEFAULT); // <1>
|
||||
// end::remote-info-execute
|
||||
|
||||
// tag::remote-info-response
|
||||
List<RemoteConnectionInfo> infos = response.getInfos();
|
||||
// end::remote-info-response
|
||||
|
||||
assertThat(infos.size(), greaterThan(0));
|
||||
}
|
||||
|
||||
public void testRemoteInfoAsync() throws Exception {
|
||||
setupRemoteClusterConfig("local_cluster");
|
||||
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
|
||||
// tag::remote-info-request
|
||||
RemoteInfoRequest request = new RemoteInfoRequest();
|
||||
// end::remote-info-request
|
||||
|
||||
|
||||
// tag::remote-info-execute-listener
|
||||
ActionListener<RemoteInfoResponse> listener =
|
||||
new ActionListener<RemoteInfoResponse>() {
|
||||
@Override
|
||||
public void onResponse(RemoteInfoResponse response) {
|
||||
// <1>
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// <2>
|
||||
}
|
||||
};
|
||||
// end::remote-info-execute-listener
|
||||
|
||||
// Replace the empty listener by a blocking listener in test
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
listener = new LatchedActionListener<>(listener, latch);
|
||||
|
||||
// tag::health-execute-async
|
||||
client.cluster().remoteInfoAsync(request, RequestOptions.DEFAULT, listener); // <1>
|
||||
// end::health-execute-async
|
||||
|
||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.client.documentation;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
|
||||
|
@ -28,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
|
|||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.client.core.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.indexlifecycle.DeleteAction;
|
||||
|
@ -78,8 +76,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
|
@ -1210,8 +1206,4 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
static Map<String, Object> toMap(Response response) throws IOException {
|
||||
return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
--
|
||||
:api: remote-info
|
||||
:request: RemoteInfoRequest
|
||||
:response: RemoteInfoResponse
|
||||
--
|
||||
|
||||
[id="{upid}-{api}"]
|
||||
=== Remote Cluster Info API
|
||||
|
||||
The Remote cluster info API allows to get all of the configured remote cluster information.
|
||||
|
||||
[id="{upid}-{api}-request"]
|
||||
==== Remote Cluster Info Request
|
||||
|
||||
A +{request}+:
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests-file}[{api}-request]
|
||||
--------------------------------------------------
|
||||
|
||||
There are no required parameters.
|
||||
|
||||
==== Remote Cluster Info Response
|
||||
|
||||
The returned +{response}+ allows to retrieve remote cluster information.
|
||||
It returns connection and endpoint information keyed by the configured remote cluster alias.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests-file}[{api}-response]
|
||||
--------------------------------------------------
|
|
@ -168,12 +168,14 @@ The Java High Level REST Client supports the following Cluster APIs:
|
|||
* <<java-rest-high-cluster-put-settings>>
|
||||
* <<java-rest-high-cluster-get-settings>>
|
||||
* <<java-rest-high-cluster-health>>
|
||||
* <<java-rest-high-cluster-remote-info>>
|
||||
|
||||
:upid: {mainid}-cluster
|
||||
:doc-tests-file: {doc-tests}/ClusterClientDocumentationIT.java
|
||||
include::cluster/put_settings.asciidoc[]
|
||||
include::cluster/get_settings.asciidoc[]
|
||||
include::cluster/health.asciidoc[]
|
||||
include::cluster/remote_info.asciidoc[]
|
||||
|
||||
== Ingest APIs
|
||||
The Java High Level REST Client supports the following Ingest APIs:
|
||||
|
|
|
@ -41,7 +41,7 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte
|
|||
infos = in.readList(RemoteConnectionInfo::new);
|
||||
}
|
||||
|
||||
RemoteInfoResponse(Collection<RemoteConnectionInfo> infos) {
|
||||
public RemoteInfoResponse(Collection<RemoteConnectionInfo> infos) {
|
||||
this.infos = Collections.unmodifiableList(new ArrayList<>(infos));
|
||||
}
|
||||
|
||||
|
|
|
@ -268,13 +268,13 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
return new TransportAddress(parseConfiguredAddress(address));
|
||||
}
|
||||
|
||||
static class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo {
|
||||
public static class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo {
|
||||
|
||||
private final String address;
|
||||
private final int maxSocketConnections;
|
||||
private final int numSocketsConnected;
|
||||
|
||||
ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) {
|
||||
public ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) {
|
||||
this.address = address;
|
||||
this.maxSocketConnections = maxSocketConnections;
|
||||
this.numSocketsConnected = numSocketsConnected;
|
||||
|
@ -311,6 +311,18 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
|
|||
return "proxy";
|
||||
}
|
||||
|
||||
public String getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public int getMaxSocketConnections() {
|
||||
return maxSocketConnections;
|
||||
}
|
||||
|
||||
public int getNumSocketsConnected() {
|
||||
return numSocketsConnected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteConnectionStrategy.ConnectionStrategy modeType() {
|
||||
return RemoteConnectionStrategy.ConnectionStrategy.PROXY;
|
||||
|
|
|
@ -50,7 +50,7 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
final String clusterAlias;
|
||||
final boolean skipUnavailable;
|
||||
|
||||
RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) {
|
||||
public RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) {
|
||||
this.clusterAlias = clusterAlias;
|
||||
this.modeInfo = modeInfo;
|
||||
this.initialConnectionTimeout = initialConnectionTimeout;
|
||||
|
@ -103,6 +103,18 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
|
|||
return clusterAlias;
|
||||
}
|
||||
|
||||
public ModeInfo getModeInfo() {
|
||||
return modeInfo;
|
||||
}
|
||||
|
||||
public TimeValue getInitialConnectionTimeout() {
|
||||
return initialConnectionTimeout;
|
||||
}
|
||||
|
||||
public boolean isSkipUnavailable() {
|
||||
return skipUnavailable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
|
||||
|
|
|
@ -534,13 +534,13 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
return Objects.equals(oldProxy, newProxy) == false;
|
||||
}
|
||||
|
||||
static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo {
|
||||
public static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo {
|
||||
|
||||
final List<String> seedNodes;
|
||||
final int maxConnectionsPerCluster;
|
||||
final int numNodesConnected;
|
||||
|
||||
SniffModeInfo(List<String> seedNodes, int maxConnectionsPerCluster, int numNodesConnected) {
|
||||
public SniffModeInfo(List<String> seedNodes, int maxConnectionsPerCluster, int numNodesConnected) {
|
||||
this.seedNodes = seedNodes;
|
||||
this.maxConnectionsPerCluster = maxConnectionsPerCluster;
|
||||
this.numNodesConnected = numNodesConnected;
|
||||
|
@ -581,6 +581,18 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
return "sniff";
|
||||
}
|
||||
|
||||
public List<String> getSeedNodes() {
|
||||
return seedNodes;
|
||||
}
|
||||
|
||||
public int getMaxConnectionsPerCluster() {
|
||||
return maxConnectionsPerCluster;
|
||||
}
|
||||
|
||||
public int getNumNodesConnected() {
|
||||
return numNodesConnected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteConnectionStrategy.ConnectionStrategy modeType() {
|
||||
return RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
|
||||
|
|
|
@ -780,6 +780,19 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
return array;
|
||||
}
|
||||
|
||||
public static <T> List<T> randomList(int maxListSize, Supplier<T> valueConstructor) {
|
||||
return randomList(0, maxListSize, valueConstructor);
|
||||
}
|
||||
|
||||
public static <T> List<T> randomList(int minListSize, int maxListSize, Supplier<T> valueConstructor) {
|
||||
final int size = randomIntBetween(minListSize, maxListSize);
|
||||
List<T> list = new ArrayList<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
list.add(valueConstructor.get());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
private static final String[] TIME_SUFFIXES = new String[]{"d", "h", "ms", "s", "m", "micros", "nanos"};
|
||||
|
||||
|
|
Loading…
Reference in New Issue