Add a dedicated TransportRemoteInfoAction for consistency (#24040)
All our actions that are invoked from rest actions have corresponding transport actions. This adds the transport action for RestRemoteClusterInfoAction for consistency. Relates to #23969
This commit is contained in:
parent
42e0b4f5e9
commit
e30a275bfe
|
@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
|
|||
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
|
||||
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction;
|
||||
|
@ -173,7 +175,6 @@ import org.elasticsearch.action.main.MainAction;
|
|||
import org.elasticsearch.action.main.TransportMainAction;
|
||||
import org.elasticsearch.action.search.ClearScrollAction;
|
||||
import org.elasticsearch.action.search.MultiSearchAction;
|
||||
import org.elasticsearch.action.search.RemoteClusterService;
|
||||
import org.elasticsearch.action.search.SearchAction;
|
||||
import org.elasticsearch.action.search.SearchScrollAction;
|
||||
import org.elasticsearch.action.search.TransportClearScrollAction;
|
||||
|
@ -402,6 +403,7 @@ public class ActionModule extends AbstractModule {
|
|||
|
||||
actions.register(MainAction.INSTANCE, TransportMainAction.class);
|
||||
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
|
||||
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
|
||||
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
|
||||
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
|
||||
actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
|
||||
|
@ -502,7 +504,7 @@ public class ActionModule extends AbstractModule {
|
|||
return unmodifiableList(actionPlugins.stream().flatMap(p -> p.getActionFilters().stream()).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, RemoteClusterService remoteClusterService) {
|
||||
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
|
||||
List<AbstractCatAction> catActions = new ArrayList<>();
|
||||
Consumer<RestHandler> registerHandler = a -> {
|
||||
if (a instanceof AbstractCatAction) {
|
||||
|
@ -511,7 +513,7 @@ public class ActionModule extends AbstractModule {
|
|||
};
|
||||
registerHandler.accept(new RestMainAction(settings, restController));
|
||||
registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
|
||||
registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController, remoteClusterService));
|
||||
registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
|
||||
registerHandler.accept(new RestNodesStatsAction(settings, restController));
|
||||
registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
|
||||
registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.remote;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public final class RemoteInfoAction extends Action<RemoteInfoRequest, RemoteInfoResponse, RemoteInfoRequestBuilder> {
|
||||
|
||||
public static final String NAME = "cluster:monitor/remote/info";
|
||||
public static final RemoteInfoAction INSTANCE = new RemoteInfoAction();
|
||||
|
||||
public RemoteInfoAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteInfoRequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new RemoteInfoRequestBuilder(client, INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteInfoResponse newResponse() {
|
||||
return new RemoteInfoResponse();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.remote;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
|
||||
public final class RemoteInfoRequest extends ActionRequest {
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.remote;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public final class RemoteInfoRequestBuilder extends ActionRequestBuilder<RemoteInfoRequest, RemoteInfoResponse, RemoteInfoRequestBuilder> {
|
||||
|
||||
public RemoteInfoRequestBuilder(ElasticsearchClient client, RemoteInfoAction action) {
|
||||
super(client, action, new RemoteInfoRequest());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.remote;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.search.RemoteConnectionInfo;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public final class RemoteInfoResponse extends ActionResponse implements ToXContentObject {
|
||||
|
||||
private List<RemoteConnectionInfo> infos;
|
||||
|
||||
RemoteInfoResponse() {
|
||||
}
|
||||
|
||||
RemoteInfoResponse(Collection<RemoteConnectionInfo> infos) {
|
||||
this.infos = Collections.unmodifiableList(new ArrayList<>(infos));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeList(infos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
infos = in.readList(RemoteConnectionInfo::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
for (RemoteConnectionInfo info : infos) {
|
||||
info.toXContent(builder, params);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.remote;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.RemoteClusterService;
|
||||
import org.elasticsearch.action.search.SearchTransportService;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public final class TransportRemoteInfoAction extends HandledTransportAction<RemoteInfoRequest, RemoteInfoResponse> {
|
||||
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
|
||||
@Inject
|
||||
public TransportRemoteInfoAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SearchTransportService searchTransportService) {
|
||||
super(settings, RemoteInfoAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
|
||||
RemoteInfoRequest::new);
|
||||
this.remoteClusterService = searchTransportService.getRemoteClusterService();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
|
||||
remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos
|
||||
-> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure));
|
||||
}
|
||||
}
|
|
@ -573,8 +573,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
|
|||
}
|
||||
}
|
||||
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
|
||||
seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toSet()), httpAddresses, maxNumRemoteConnections,
|
||||
connectedNodes.size(), RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings));
|
||||
seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
|
||||
maxNumRemoteConnections, connectedNodes.size(),
|
||||
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings));
|
||||
listener.onResponse(remoteConnectionInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,28 +18,32 @@
|
|||
*/
|
||||
package org.elasticsearch.action.search;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* This class encapsulates all remote cluster information to be rendered on
|
||||
* <tt>_remote/info</tt> requests.
|
||||
*/
|
||||
public final class RemoteConnectionInfo implements ToXContent {
|
||||
final Collection<TransportAddress> seedNodes;
|
||||
final Collection<TransportAddress> httpAddresses;
|
||||
public final class RemoteConnectionInfo implements ToXContent, Writeable {
|
||||
final List<TransportAddress> seedNodes;
|
||||
final List<TransportAddress> httpAddresses;
|
||||
final int connectionsPerCluster;
|
||||
final TimeValue initialConnectionTimeout;
|
||||
final int numNodesConnected;
|
||||
final String clusterAlias;
|
||||
|
||||
RemoteConnectionInfo(String clusterAlias, Collection<TransportAddress> seedNodes,
|
||||
Collection<TransportAddress> httpAddresses,
|
||||
RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
|
||||
List<TransportAddress> httpAddresses,
|
||||
int connectionsPerCluster, int numNodesConnected,
|
||||
TimeValue initialConnectionTimeout) {
|
||||
this.clusterAlias = clusterAlias;
|
||||
|
@ -50,6 +54,15 @@ public final class RemoteConnectionInfo implements ToXContent {
|
|||
this.initialConnectionTimeout = initialConnectionTimeout;
|
||||
}
|
||||
|
||||
public RemoteConnectionInfo(StreamInput input) throws IOException {
|
||||
seedNodes = input.readList(TransportAddress::new);
|
||||
httpAddresses = input.readList(TransportAddress::new);
|
||||
connectionsPerCluster = input.readVInt();
|
||||
initialConnectionTimeout = new TimeValue(input);
|
||||
numNodesConnected = input.readVInt();
|
||||
clusterAlias = input.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(clusterAlias);
|
||||
|
@ -72,4 +85,32 @@ public final class RemoteConnectionInfo implements ToXContent {
|
|||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeList(seedNodes);
|
||||
out.writeList(httpAddresses);
|
||||
out.writeVInt(connectionsPerCluster);
|
||||
initialConnectionTimeout.writeTo(out);
|
||||
out.writeVInt(numNodesConnected);
|
||||
out.writeString(clusterAlias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
RemoteConnectionInfo that = (RemoteConnectionInfo) o;
|
||||
return connectionsPerCluster == that.connectionsPerCluster &&
|
||||
numNodesConnected == that.numNodesConnected &&
|
||||
Objects.equals(seedNodes, that.seedNodes) &&
|
||||
Objects.equals(httpAddresses, that.httpAddresses) &&
|
||||
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
|
||||
Objects.equals(clusterAlias, that.clusterAlias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout, numNodesConnected, clusterAlias);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -486,7 +486,7 @@ public class Node implements Closeable {
|
|||
|
||||
if (NetworkModule.HTTP_ENABLED.get(settings)) {
|
||||
logger.debug("initializing HTTP handlers ...");
|
||||
actionModule.initRestHandlers(() -> clusterService.state().nodes(), searchTransportService.getRemoteClusterService());
|
||||
actionModule.initRestHandlers(() -> clusterService.state().nodes());
|
||||
}
|
||||
logger.info("initialized");
|
||||
|
||||
|
|
|
@ -19,8 +19,9 @@
|
|||
|
||||
package org.elasticsearch.rest.action.admin.cluster;
|
||||
|
||||
import org.elasticsearch.action.search.RemoteClusterService;
|
||||
import org.elasticsearch.action.search.RemoteConnectionInfo;
|
||||
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
|
||||
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
|
||||
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoResponse;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -30,40 +31,28 @@ import org.elasticsearch.rest.RestController;
|
|||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.RestResponseListener;
|
||||
import org.elasticsearch.rest.action.RestBuilderListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.GET;
|
||||
|
||||
public final class RestRemoteClusterInfoAction extends BaseRestHandler {
|
||||
|
||||
private final RemoteClusterService remoteClusterService;
|
||||
|
||||
public RestRemoteClusterInfoAction(Settings settings, RestController controller,
|
||||
RemoteClusterService remoteClusterService) {
|
||||
public RestRemoteClusterInfoAction(Settings settings, RestController controller) {
|
||||
super(settings);
|
||||
controller.registerHandler(GET, "_remote/info", this);
|
||||
this.remoteClusterService = remoteClusterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client)
|
||||
throws IOException {
|
||||
return channel -> remoteClusterService.getRemoteConnectionInfos(
|
||||
new RestResponseListener<Collection<RemoteConnectionInfo>>(channel) {
|
||||
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(),
|
||||
new RestBuilderListener<RemoteInfoResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(
|
||||
Collection<RemoteConnectionInfo> remoteConnectionInfos) throws Exception {
|
||||
try (XContentBuilder xContentBuilder = channel.newBuilder()) {
|
||||
xContentBuilder.startObject();
|
||||
for (RemoteConnectionInfo info : remoteConnectionInfos) {
|
||||
info.toXContent(xContentBuilder, request);
|
||||
}
|
||||
xContentBuilder.endObject();
|
||||
return new BytesRestResponse(RestStatus.OK, xContentBuilder);
|
||||
}
|
||||
public RestResponse buildResponse(RemoteInfoResponse response, XContentBuilder builder) throws Exception {
|
||||
response.toXContent(builder, request);
|
||||
return new BytesRestResponse(RestStatus.OK, builder);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -113,7 +113,7 @@ public class ActionModuleTests extends ESTestCase {
|
|||
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY),
|
||||
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), null, emptyList(), null,
|
||||
null);
|
||||
actionModule.initRestHandlers(null, null);
|
||||
actionModule.initRestHandlers(null);
|
||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () ->
|
||||
actionModule.getRestController().registerHandler(Method.GET, "/", null));
|
||||
|
@ -135,7 +135,7 @@ public class ActionModuleTests extends ESTestCase {
|
|||
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY),
|
||||
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
|
||||
singletonList(dupsMainAction), null, null);
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null));
|
||||
assertThat(e.getMessage(), startsWith("Path [/] already has a value [" + RestMainAction.class.getName()));
|
||||
} finally {
|
||||
threadPool.shutdown();
|
||||
|
@ -166,7 +166,7 @@ public class ActionModuleTests extends ESTestCase {
|
|||
ActionModule actionModule = new ActionModule(false, settings.getSettings(), new IndexNameExpressionResolver(Settings.EMPTY),
|
||||
settings.getIndexScopedSettings(), settings.getClusterSettings(), settings.getSettingsFilter(), threadPool,
|
||||
singletonList(registersFakeHandler), null, null);
|
||||
actionModule.initRestHandlers(null, null);
|
||||
actionModule.initRestHandlers(null);
|
||||
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () ->
|
||||
actionModule.getRestController().registerHandler(Method.GET, "/_dummy", null));
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
@ -567,7 +569,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
|
||||
seedNodes, service, maxNumConnections, n -> true)) {
|
||||
// test no nodes connected
|
||||
RemoteConnectionInfo remoteConnectionInfo = getRemoteConnectionInfo(connection);
|
||||
RemoteConnectionInfo remoteConnectionInfo = assertSerialization(getRemoteConnectionInfo(connection));
|
||||
assertNotNull(remoteConnectionInfo);
|
||||
assertEquals(0, remoteConnectionInfo.numNodesConnected);
|
||||
assertEquals(0, remoteConnectionInfo.seedNodes.size());
|
||||
|
@ -582,6 +584,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
remoteConnectionInfo = getRemoteConnectionInfo(connection);
|
||||
remoteConnectionInfo = assertSerialization(remoteConnectionInfo);
|
||||
assertNotNull(remoteConnectionInfo);
|
||||
assertEquals(connection.getNumNodesConnected(), remoteConnectionInfo.numNodesConnected);
|
||||
assertEquals(Math.min(3, maxNumConnections), connection.getNumNodesConnected());
|
||||
|
@ -597,11 +600,75 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRemoteConnectionInfo() throws IOException {
|
||||
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
||||
4, 3, TimeValue.timeValueMinutes(30));
|
||||
assertSerialization(stats);
|
||||
|
||||
RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
||||
4, 4, TimeValue.timeValueMinutes(30));
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster_1",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
||||
4, 3, TimeValue.timeValueMinutes(30));
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
||||
4, 3, TimeValue.timeValueMinutes(30));
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)),
|
||||
4, 3, TimeValue.timeValueMinutes(30));
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
||||
4, 3, TimeValue.timeValueMinutes(325));
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
|
||||
stats1 = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
|
||||
5, 3, TimeValue.timeValueMinutes(30));
|
||||
assertSerialization(stats1);
|
||||
assertNotEquals(stats, stats1);
|
||||
}
|
||||
|
||||
private RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException {
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.setVersion(Version.CURRENT);
|
||||
info.writeTo(out);
|
||||
StreamInput in = out.bytes().streamInput();
|
||||
in.setVersion(Version.CURRENT);
|
||||
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(in);
|
||||
assertEquals(info, remoteConnectionInfo);
|
||||
assertEquals(info.hashCode(), remoteConnectionInfo.hashCode());
|
||||
return randomBoolean() ? info : remoteConnectionInfo;
|
||||
}
|
||||
}
|
||||
|
||||
public void testRenderConnectionInfoXContent() throws IOException {
|
||||
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)),
|
||||
4, 3, TimeValue.timeValueMinutes(30));
|
||||
stats = assertSerialization(stats);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
stats.toXContent(builder, null);
|
||||
|
@ -613,6 +680,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)),
|
||||
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)),
|
||||
2, 0, TimeValue.timeValueSeconds(30));
|
||||
stats = assertSerialization(stats);
|
||||
builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
stats.toXContent(builder, null);
|
||||
|
|
|
@ -26,6 +26,16 @@
|
|||
|
||||
- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}
|
||||
|
||||
# we do another search here since this will enforce the connection to be established
|
||||
# otherwise the cluster might not have been connected yet.
|
||||
- do:
|
||||
search:
|
||||
index: test_remote_cluster:test_index
|
||||
|
||||
- match: { _shards.total: 3 }
|
||||
- match: { hits.total: 6 }
|
||||
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }
|
||||
|
||||
- do:
|
||||
remote.info: {}
|
||||
- set: { my_remote_cluster.http_addresses.0: remote_http }
|
||||
|
|
Loading…
Reference in New Issue