Remove transport client from tests. (#1809)

Transport client has been deprecated. In this commit, we are removing all its usages from the tests.

Signed-off-by: Rabi Panda <adnapibar@gmail.com>
This commit is contained in:
Rabi Panda 2021-12-27 15:43:18 -08:00 committed by GitHub
parent b00b45b25d
commit 24d0c9b818
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
73 changed files with 149 additions and 2884 deletions

View File

@ -61,11 +61,6 @@ public class MultiSearchTemplateIT extends OpenSearchIntegTestCase {
return Collections.singleton(MustachePlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
public void testBasic() throws Exception {
createIndex("msearch");
final int numDocs = randomIntBetween(10, 100);

View File

@ -63,11 +63,6 @@ public abstract class ParentChildTestCase extends OpenSearchIntegTestCase {
return Arrays.asList(InternalSettingsPlugin.class, ParentJoinPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
public Settings indexSettings() {
Settings.Builder builder = Settings.builder()

View File

@ -64,11 +64,6 @@ public class RankEvalRequestIT extends OpenSearchIntegTestCase {
private static final String INDEX_ALIAS = "alias0";
private static final int RELEVANT_RATING_1 = 1;
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(RankEvalPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(RankEvalPlugin.class);

View File

@ -91,11 +91,6 @@ public class ReindexDocumentationIT extends OpenSearchIntegTestCase {
return Arrays.asList(ReindexPlugin.class, ReindexCancellationPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(ReindexPlugin.class);
}
@Before
public void setup() {
client().admin().indices().prepareCreate(INDEX_NAME).get();

View File

@ -54,11 +54,6 @@ public abstract class ReindexTestCase extends OpenSearchIntegTestCase {
return Arrays.asList(ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(ReindexPlugin.class);
}
protected ReindexRequestBuilder reindex() {
return new ReindexRequestBuilder(client(), ReindexAction.INSTANCE);
}

View File

@ -87,11 +87,6 @@ public class RetryTests extends OpenSearchIntegTestCase {
return Arrays.asList(ReindexPlugin.class, Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(ReindexPlugin.class, Netty4Plugin.class);
}
/**
* Lower the queue sizes to be small enough that bulk will time out and have to be retried.
*/

View File

@ -65,20 +65,8 @@ public abstract class OpenSearchNetty4IntegTestCase extends OpenSearchIntegTestC
return builder.build();
}
@Override
protected Settings transportClientSettings() {
Settings.Builder builder = Settings.builder().put(super.transportClientSettings());
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME);
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(Netty4Plugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(Netty4Plugin.class);
}
}

View File

@ -60,7 +60,7 @@ import static org.hamcrest.Matchers.hasSize;
* As the same setting is also used to limit in-flight requests on transport level, we avoid transport messages by forcing
* a single node "cluster". We also force test infrastructure to use the node client instead of the transport client for the same reason.
*/
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numClientNodes = 0, numDataNodes = 1, transportClientRatio = 0)
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numClientNodes = 0, numDataNodes = 1)
public class Netty4HttpRequestSizeLimitIT extends OpenSearchNetty4IntegTestCase {
private static final ByteSizeValue LIMIT = new ByteSizeValue(2, ByteSizeUnit.KB);

View File

@ -60,7 +60,7 @@ import static org.hamcrest.core.Is.is;
// These tests are here today so they have access to a proper REST client. They cannot be in :server:integTest since the REST client needs a
// proper transport implementation, and they cannot be REST tests today since they need to restart nodes. When #35599 and friends land we
// should be able to move these tests to run against a proper cluster instead. TODO do this.
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoManageMasterNodes = false)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class Zen2RestApiIT extends OpenSearchNetty4IntegTestCase {
@Override

View File

@ -32,26 +32,17 @@
package org.opensearch.transport.netty4;
import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.BoundTransportAddress;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.env.Environment;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.junit.annotations.Network;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.Netty4Plugin;
import org.opensearch.transport.TransportInfo;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Locale;
import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.TRANSPORT;
@ -85,22 +76,6 @@ public class Netty4TransportMultiPortIntegrationIT extends OpenSearchNetty4Integ
return builder.build();
}
public void testThatTransportClientCanConnect() throws Exception {
Settings settings = Settings.builder()
.put("cluster.name", internalCluster().getClusterName())
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
// we have to test all the ports that the data node might be bound to
try (TransportClient transportClient = new MockTransportClient(settings, Arrays.asList(Netty4Plugin.class))) {
for (int i = 0; i <= 10; i++) {
transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), randomPort + i));
}
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));
}
}
@Network
public void testThatInfosAreExposed() throws Exception {
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().clear().addMetric(TRANSPORT.metricName()).get();

View File

@ -40,7 +40,7 @@ import org.opensearch.test.OpenSearchIntegTestCase;
import static org.hamcrest.Matchers.containsString;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0.0, numClientNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class AzureSimpleTests extends AbstractAzureComputeServiceTestCase {
public void testOneNodeShouldRunUsingPrivateIp() {

View File

@ -38,7 +38,7 @@ import org.opensearch.cloud.azure.classic.management.AzureComputeService.Managem
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0.0, numClientNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class AzureTwoStartedNodesTests extends AbstractAzureComputeServiceTestCase {
public void testTwoNodesShouldRunUsingPrivateOrPublicIp() {

View File

@ -46,7 +46,7 @@ import static org.hamcrest.CoreMatchers.is;
* starting.
* This test requires AWS to run.
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class Ec2DiscoveryUpdateSettingsTests extends AbstractAwsTestCase {
public void testMinimumMasterNodesStart() {
Settings nodeSettings = Settings.builder().put(DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "ec2").build();

View File

@ -65,21 +65,9 @@ public abstract class NioIntegTestCase extends OpenSearchIntegTestCase {
return builder.build();
}
@Override
protected Settings transportClientSettings() {
Settings.Builder builder = Settings.builder().put(super.transportClientSettings());
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME);
return builder.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(NioTransportPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(NioTransportPlugin.class);
}
}

View File

@ -1,367 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.http;
import org.apache.lucene.util.SetOnce;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilter;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
import org.opensearch.client.Client;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.GeoShapeQueryBuilder;
import org.opensearch.index.query.MoreLikeThisQueryBuilder;
import org.opensearch.index.query.MoreLikeThisQueryBuilder.Item;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.indices.TermsLookup;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestHeaderDefinition;
import org.opensearch.script.ScriptService;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.OpenSearchIntegTestCase.Scope.SUITE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ClusterScope(scope = SUITE)
public class ContextAndHeaderTransportIT extends HttpSmokeTestCase {
private static final List<RequestAndHeaders> requests = new CopyOnWriteArrayList<>();
private static final RestHeaderDefinition CUSTOM_HEADER = new RestHeaderDefinition("SomeCustomHeader", false);
private String randomHeaderValue = randomAlphaOfLength(20);
private String queryIndex = "query-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
private String lookupIndex = "lookup-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(ActionLoggingPlugin.class);
plugins.add(CustomHeadersPlugin.class);
return plugins;
}
@Before
public void createIndices() throws Exception {
String mapping = Strings.toString(jsonBuilder().startObject().startObject("type")
.startObject("properties")
.startObject("location").field("type", "geo_shape").endObject()
.startObject("name").field("type", "text").endObject()
.endObject()
.endObject().endObject());
Settings settings = Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 1) // A single shard will help to keep the tests repeatable.
.build();
assertAcked(transportClient().admin().indices().prepareCreate(lookupIndex)
.setSettings(settings).addMapping("type", mapping, XContentType.JSON));
assertAcked(transportClient().admin().indices().prepareCreate(queryIndex)
.setSettings(settings).addMapping("type", mapping, XContentType.JSON));
ensureGreen(queryIndex, lookupIndex);
requests.clear();
}
@After
public void checkAllRequestsContainHeaders() {
assertRequestsContainHeader(IndexRequest.class);
assertRequestsContainHeader(RefreshRequest.class);
}
public void testThatTermsLookupGetRequestContainsContextAndHeaders() throws Exception {
transportClient().prepareIndex(lookupIndex, "type", "1")
.setSource(jsonBuilder().startObject().array("followers", "foo", "bar", "baz").endObject()).get();
transportClient().prepareIndex(queryIndex, "type", "1")
.setSource(jsonBuilder().startObject().field("username", "foo").endObject()).get();
transportClient().admin().indices().prepareRefresh(queryIndex, lookupIndex).get();
TermsLookup termsLookup = new TermsLookup(lookupIndex, "type", "1", "followers");
TermsQueryBuilder termsLookupFilterBuilder = QueryBuilders.termsLookupQuery("username", termsLookup);
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery()).must(termsLookupFilterBuilder);
SearchResponse searchResponse = transportClient()
.prepareSearch(queryIndex)
.setQuery(queryBuilder)
.get();
assertNoFailures(searchResponse);
assertHitCount(searchResponse, 1);
assertGetRequestsContainHeaders();
}
public void testThatGeoShapeQueryGetRequestContainsContextAndHeaders() throws Exception {
transportClient().prepareIndex(lookupIndex, "type", "1").setSource(jsonBuilder().startObject()
.field("name", "Munich Suburban Area")
.startObject("location")
.field("type", "polygon")
.startArray("coordinates").startArray()
.startArray().value(11.34).value(48.25).endArray()
.startArray().value(11.68).value(48.25).endArray()
.startArray().value(11.65).value(48.06).endArray()
.startArray().value(11.37).value(48.13).endArray()
.startArray().value(11.34).value(48.25).endArray() // close the polygon
.endArray().endArray()
.endObject()
.endObject())
.get();
// second document
transportClient().prepareIndex(queryIndex, "type", "1").setSource(jsonBuilder().startObject()
.field("name", "Munich Center")
.startObject("location")
.field("type", "point")
.startArray("coordinates").value(11.57).value(48.13).endArray()
.endObject()
.endObject())
.get();
transportClient().admin().indices().prepareRefresh(lookupIndex, queryIndex).get();
GeoShapeQueryBuilder queryBuilder = QueryBuilders.geoShapeQuery("location", "1", "type")
.indexedShapeIndex(lookupIndex)
.indexedShapePath("location");
SearchResponse searchResponse = transportClient()
.prepareSearch(queryIndex)
.setQuery(queryBuilder)
.get();
assertNoFailures(searchResponse);
assertHitCount(searchResponse, 1);
assertThat(requests, hasSize(greaterThan(0)));
assertGetRequestsContainHeaders();
}
public void testThatMoreLikeThisQueryMultiTermVectorRequestContainsContextAndHeaders() throws Exception {
transportClient().prepareIndex(lookupIndex, "type", "1")
.setSource(jsonBuilder().startObject().field("name", "Star Wars - The new republic").endObject())
.get();
transportClient().prepareIndex(queryIndex, "type", "1")
.setSource(jsonBuilder().startObject().field("name", "Jar Jar Binks - A horrible mistake").endObject())
.get();
transportClient().prepareIndex(queryIndex, "type", "2")
.setSource(jsonBuilder().startObject().field("name", "Star Wars - Return of the jedi").endObject())
.get();
transportClient().admin().indices().prepareRefresh(lookupIndex, queryIndex).get();
MoreLikeThisQueryBuilder moreLikeThisQueryBuilder = QueryBuilders.moreLikeThisQuery(new String[]{"name"}, null,
new Item[]{new Item(lookupIndex, "type", "1")})
.minTermFreq(1)
.minDocFreq(1);
SearchResponse searchResponse = transportClient()
.prepareSearch(queryIndex)
.setQuery(moreLikeThisQueryBuilder)
.get();
assertNoFailures(searchResponse);
assertHitCount(searchResponse, 1);
assertRequestsContainHeader(MultiTermVectorsRequest.class);
}
public void testThatRelevantHttpHeadersBecomeRequestHeaders() throws IOException {
final String IRRELEVANT_HEADER = "SomeIrrelevantHeader";
Request request = new Request("GET", "/" + queryIndex + "/_search");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader(CUSTOM_HEADER.getName(), randomHeaderValue);
options.addHeader(IRRELEVANT_HEADER, randomHeaderValue);
request.setOptions(options);
Response response = getRestClient().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
List<RequestAndHeaders> searchRequests = getRequests(SearchRequest.class);
assertThat(searchRequests, hasSize(greaterThan(0)));
for (RequestAndHeaders requestAndHeaders : searchRequests) {
assertThat(requestAndHeaders.headers.containsKey(CUSTOM_HEADER.getName()), is(true));
// was not specified, thus is not included
assertThat(requestAndHeaders.headers.containsKey(IRRELEVANT_HEADER), is(false));
}
}
private List<RequestAndHeaders> getRequests(Class<?> clazz) {
List<RequestAndHeaders> results = new ArrayList<>();
for (RequestAndHeaders request : requests) {
if (request.request.getClass().equals(clazz)) {
results.add(request);
}
}
return results;
}
private void assertRequestsContainHeader(Class<? extends ActionRequest> clazz) {
List<RequestAndHeaders> classRequests = getRequests(clazz);
for (RequestAndHeaders request : classRequests) {
assertRequestContainsHeader(request.request, request.headers);
}
}
private void assertGetRequestsContainHeaders() {
assertGetRequestsContainHeaders(this.lookupIndex);
}
private void assertGetRequestsContainHeaders(String index) {
List<RequestAndHeaders> getRequests = getRequests(GetRequest.class);
assertThat(getRequests, hasSize(greaterThan(0)));
for (RequestAndHeaders request : getRequests) {
if (!((GetRequest)request.request).index().equals(index)) {
continue;
}
assertRequestContainsHeader(request.request, request.headers);
}
}
private void assertRequestContainsHeader(ActionRequest request, Map<String, String> context) {
String msg = String.format(Locale.ROOT, "Expected header %s to be in request %s", CUSTOM_HEADER.getName(),
request.getClass().getName());
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
msg = String.format(Locale.ROOT, "Expected header %s to be in index request %s/%s/%s", CUSTOM_HEADER.getName(),
indexRequest.index(), indexRequest.type(), indexRequest.id());
}
assertThat(msg, context.containsKey(CUSTOM_HEADER.getName()), is(true));
assertThat(context.get(CUSTOM_HEADER.getName()).toString(), is(randomHeaderValue));
}
/**
* a transport client that adds our random header
*/
private Client transportClient() {
return internalCluster().transportClient().filterWithHeader(Collections.singletonMap(CUSTOM_HEADER.getName(), randomHeaderValue));
}
public static class ActionLoggingPlugin extends Plugin implements ActionPlugin {
private final SetOnce<LoggingFilter> loggingFilter = new SetOnce<>();
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
loggingFilter.set(new LoggingFilter(threadPool));
return Collections.emptyList();
}
@Override
public List<ActionFilter> getActionFilters() {
return singletonList(loggingFilter.get());
}
}
public static class LoggingFilter extends ActionFilter.Simple {
private final ThreadPool threadPool;
public LoggingFilter(ThreadPool pool) {
this.threadPool = pool;
}
@Override
public int order() {
return 999;
}
@Override
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
requests.add(new RequestAndHeaders(threadPool.getThreadContext().getHeaders(), request));
return true;
}
}
private static class RequestAndHeaders {
final Map<String, String> headers;
final ActionRequest request;
private RequestAndHeaders(Map<String, String> headers, ActionRequest request) {
this.headers = headers;
this.request = request;
}
}
public static class CustomHeadersPlugin extends Plugin implements ActionPlugin {
public Collection<RestHeaderDefinition> getRestHeaders() {
return Collections.singleton(CUSTOM_HEADER);
}
}
}

View File

@ -95,19 +95,6 @@ public abstract class HttpSmokeTestCase extends OpenSearchIntegTestCase {
return Arrays.asList(getTestTransportPlugin(), Netty4Plugin.class, NioTransportPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(getTestTransportPlugin(), Netty4Plugin.class, NioTransportPlugin.class);
}
@Override
protected Settings transportClientSettings() {
return Settings.builder()
.put(super.transportClientSettings())
.put(NetworkModule.TRANSPORT_TYPE_KEY, clientTypeKey)
.build();
}
@Override
protected boolean ignoreExternalCluster() {
return true;

View File

@ -36,7 +36,6 @@ import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.test.OpenSearchIntegTestCase;
import java.util.concurrent.CountDownLatch;
@ -72,11 +71,6 @@ public class ListenerActionIT extends OpenSearchIntegTestCase {
latch.await();
boolean shouldBeThreaded = TransportClient.CLIENT_TYPE.equals(Client.CLIENT_TYPE_SETTING_S.get(client.settings()));
if (shouldBeThreaded) {
assertTrue(threadName.get().contains("listener"));
} else {
assertFalse(threadName.get().contains("listener"));
}
}
}

View File

@ -38,7 +38,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.containsString;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ClientTimeoutIT extends OpenSearchIntegTestCase {
@Override

View File

@ -570,11 +570,4 @@ public class CancellableTasksIT extends OpenSearchIntegTestCase {
plugins.add(TaskPlugin.class);
return plugins;
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.transportClientPlugins());
plugins.add(TaskPlugin.class);
return plugins;
}
}

View File

@ -119,7 +119,7 @@ import static org.hamcrest.Matchers.startsWith;
* <p>
* We need at least 2 nodes so we have a master node a non-master node
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2, transportClientRatio = 0.0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class TasksIT extends OpenSearchIntegTestCase {
private Map<Tuple<String, String>, RecordingTaskManagerListener> listeners = new HashMap<>();
@ -136,11 +136,6 @@ public class TasksIT extends OpenSearchIntegTestCase {
return Arrays.asList(MockTransportService.TestPlugin.class, TestTaskPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()

View File

@ -60,7 +60,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TransportClusterStateActionDisruptionIT extends OpenSearchIntegTestCase {
@Override

View File

@ -44,7 +44,7 @@ import java.io.IOException;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertRequestBuilderThrows;
@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0.0, autoManageMasterNodes = false)
@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, autoManageMasterNodes = false)
public class IndicesExistsIT extends OpenSearchIntegTestCase {
public void testIndexExistsWithBlocksInPlace() throws IOException {

View File

@ -48,9 +48,7 @@ import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.env.Environment;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.MockTransportClient;
import java.util.Arrays;
import java.util.HashSet;
@ -185,51 +183,6 @@ public class BulkProcessorIT extends OpenSearchIntegTestCase {
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
// https://github.com/elastic/elasticsearch/issues/5038
public void testBulkProcessorConcurrentRequestsNoNodeAvailableException() throws Exception {
// we create a transport client with no nodes to make sure it throws NoNodeAvailableException
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Client transportClient = new MockTransportClient(settings);
int bulkActions = randomIntBetween(10, 100);
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
int concurrentRequests = randomIntBetween(0, 10);
int expectedBulkActions = numDocs / bulkActions;
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
try (
BulkProcessor processor = BulkProcessor.builder(transportClient, listener)
.setConcurrentRequests(concurrentRequests)
.setBulkActions(bulkActions)
// set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.build()
) {
indexDocs(transportClient, processor, numDocs);
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions));
assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions));
assertThat(listener.bulkFailures.size(), equalTo(expectedBulkActions));
assertThat(listener.bulkItems.size(), equalTo(0));
}
closeLatch.await();
assertThat(listener.bulkFailures.size(), equalTo(totalExpectedBulkActions));
assertThat(listener.bulkItems.size(), equalTo(0));
transportClient.close();
}
public void testBulkProcessorWaitOnClose() throws Exception {
BulkProcessorTestListener listener = new BulkProcessorTestListener();

View File

@ -84,11 +84,6 @@ public class TransportReplicationActionRetryOnClosedNodeIT extends OpenSearchInt
return Arrays.asList(TestPlugin.class, MockTransportService.TestPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(TestPlugin.class);
}
public static class Request extends ReplicationRequest<Request> {
public Request(ShardId shardId) {
super(shardId);

View File

@ -1,106 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import static org.opensearch.client.transport.TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL;
@ClusterScope(scope = Scope.TEST)
public class NodeDisconnectIT extends OpenSearchIntegTestCase {
public void testNotifyOnDisconnect() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final Set<DiscoveryNode> disconnectedNodes = Collections.synchronizedSet(new HashSet<>());
try (
TransportClient client = new MockTransportClient(
Settings.builder()
.put("cluster.name", internalCluster().getClusterName())
.put(CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.getKey(), "1h") // disable sniffing for better control
.build(),
Collections.emptySet(),
(n, e) -> disconnectedNodes.add(n)
)
) {
for (TransportService service : internalCluster().getInstances(TransportService.class)) {
client.addTransportAddress(service.boundAddress().publishAddress());
}
internalCluster().stopRandomDataNode();
for (int i = 0; i < 20; i++) { // fire up requests such that we hit the node and pass it to the listener
client.admin().cluster().prepareState().get();
}
assertEquals(1, disconnectedNodes.size());
}
assertEquals(1, disconnectedNodes.size());
}
public void testNotifyOnDisconnectInSniffer() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final Set<DiscoveryNode> disconnectedNodes = Collections.synchronizedSet(new HashSet<>());
try (
TransportClient client = new MockTransportClient(
Settings.builder().put("cluster.name", internalCluster().getClusterName()).build(),
Collections.emptySet(),
(n, e) -> disconnectedNodes.add(n)
)
) {
int numNodes = 0;
for (TransportService service : internalCluster().getInstances(TransportService.class)) {
numNodes++;
client.addTransportAddress(service.boundAddress().publishAddress());
}
Set<TransportAddress> discoveryNodes = client.connectedNodes().stream().map(n -> n.getAddress()).collect(Collectors.toSet());
assertEquals(numNodes, discoveryNodes.size());
assertEquals(0, disconnectedNodes.size());
internalCluster().stopRandomDataNode();
client.getNodesService().doSample();
assertEquals(1, disconnectedNodes.size());
assertTrue(discoveryNodes.contains(disconnectedNodes.stream().findAny().get().getAddress()));
}
assertEquals(1, disconnectedNodes.size());
}
}

View File

@ -1,127 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.Version;
import org.opensearch.client.Client;
import org.opensearch.cluster.coordination.ClusterBootstrapService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.env.Environment;
import org.opensearch.node.MockNode;
import org.opensearch.node.Node;
import org.opensearch.node.NodeValidationException;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.MockHttpTransport;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import static org.opensearch.test.NodeRoles.nonDataNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 1.0)
public class TransportClientIT extends OpenSearchIntegTestCase {
public void testPickingUpChangesInDiscoveryNode() {
String nodeName = internalCluster().startNode(nonDataNode());
TransportClient client = (TransportClient) internalCluster().client(nodeName);
assertThat(client.connectedNodes().get(0).isDataNode(), equalTo(false));
}
public void testNodeVersionIsUpdated() throws IOException, NodeValidationException {
TransportClient client = (TransportClient) internalCluster().client();
try (
Node node = new MockNode(
Settings.builder()
.put(internalCluster().getDefaultSettings())
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put("node.name", "testNodeVersionIsUpdated")
.put("transport.type", getTestTransportType())
.put(nonDataNode())
.put("cluster.name", "foobar")
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), "testNodeVersionIsUpdated")
.build(),
Arrays.asList(getTestTransportPlugin(), MockHttpTransport.TestPlugin.class)
).start()
) {
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
client.addTransportAddress(transportAddress);
// since we force transport clients there has to be one node started that we connect to.
assertThat(client.connectedNodes().size(), greaterThanOrEqualTo(1));
// connected nodes have updated version
for (DiscoveryNode discoveryNode : client.connectedNodes()) {
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT));
}
for (DiscoveryNode discoveryNode : client.listedNodes()) {
assertThat(discoveryNode.getId(), startsWith("#transport#-"));
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion()));
}
assertThat(client.filteredNodes().size(), equalTo(1));
for (DiscoveryNode discoveryNode : client.filteredNodes()) {
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT.minimumCompatibilityVersion()));
}
}
}
public void testThatTransportClientSettingIsSet() {
TransportClient client = (TransportClient) internalCluster().client();
Settings settings = client.injector.getInstance(Settings.class);
assertThat(Client.CLIENT_TYPE_SETTING_S.get(settings), is("transport"));
}
public void testThatTransportClientSettingCannotBeChanged() {
String transport = getTestTransportType();
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.build();
try (TransportClient client = new MockTransportClient(baseSettings)) {
Settings settings = client.injector.getInstance(Settings.class);
assertThat(Client.CLIENT_TYPE_SETTING_S.get(settings), is("transport"));
}
}
}

View File

@ -1,99 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.env.Environment;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ClusterScope(scope = Scope.TEST, numClientNodes = 0, supportsDedicatedMasters = false)
public class TransportClientRetryIT extends OpenSearchIntegTestCase {
public void testRetry() throws IOException, ExecutionException, InterruptedException {
Iterable<TransportService> instances = internalCluster().getInstances(TransportService.class);
TransportAddress[] addresses = new TransportAddress[internalCluster().size()];
int i = 0;
for (TransportService instance : instances) {
addresses[i++] = instance.boundAddress().publishAddress();
}
String transport = getTestTransportType();
Settings.Builder builder = Settings.builder()
.put("client.transport.nodes_sampler_interval", "1s")
.put("node.name", "transport_client_retry_test")
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), internalCluster().getClusterName())
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir());
try (TransportClient client = new MockTransportClient(builder.build())) {
client.addTransportAddresses(addresses);
assertEquals(client.connectedNodes().size(), internalCluster().size());
int size = cluster().size();
// kill all nodes one by one, leaving a single master/data node at the end of the loop
for (int j = 1; j < size; j++) {
internalCluster().stopRandomNode(input -> true);
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().local(true);
ClusterState clusterState;
// use both variants of execute method: with and without listener
if (randomBoolean()) {
clusterState = client.admin().cluster().state(clusterStateRequest).get().getState();
} else {
PlainActionFuture<ClusterStateResponse> future = PlainActionFuture.newFuture();
client.admin().cluster().state(clusterStateRequest, future);
clusterState = future.get().getState();
}
assertThat(clusterState.nodes().getSize(), greaterThanOrEqualTo(size - j));
assertThat(client.connectedNodes().size(), greaterThanOrEqualTo(size - j));
}
}
}
}

View File

@ -1,369 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.cluster;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexGraveyard;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.ParseField;
import org.opensearch.common.Priority;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.test.OpenSearchIntegTestCase.Scope.TEST;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
/**
* This test suite sets up a situation where the cluster has two plugins installed (node, and node-and-transport-client), and a transport
* client only has node-and-transport-client plugin installed. Each of these plugins inject customs into the cluster state and we want to
* check that the client can de-serialize a cluster state response based on the fact that the response should not contain customs that the
* transport client does not understand based on the fact that it only presents the node-and-transport-client-feature.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = TEST)
public class ClusterStateIT extends OpenSearchIntegTestCase {
public abstract static class Custom implements Metadata.Custom {
private static final ParseField VALUE = new ParseField("value");
private final int value;
int value() {
return value;
}
Custom(final int value) {
this.value = value;
}
Custom(final StreamInput in) throws IOException {
value = in.readInt();
}
@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.ALL_CONTEXTS;
}
@Override
public Diff<Metadata.Custom> diff(final Metadata.Custom previousState) {
return null;
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeInt(value);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(VALUE.getPreferredName(), value);
return builder;
}
}
public static class NodeCustom extends Custom {
public static final String TYPE = "node";
NodeCustom(final int value) {
super(value);
}
NodeCustom(final StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT;
}
@Override
public Optional<String> getRequiredFeature() {
return Optional.of("node");
}
}
public static class NodeAndTransportClientCustom extends Custom {
public static final String TYPE = "node-and-transport-client";
NodeAndTransportClientCustom(final int value) {
super(value);
}
public NodeAndTransportClientCustom(final StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT;
}
/*
* This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have
* versus not requiring any feature. We use a field to make the random choice exactly once.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private final Optional<String> requiredFeature = randomBoolean() ? Optional.empty() : Optional.of("node-and-transport-client");
@Override
public Optional<String> getRequiredFeature() {
return requiredFeature;
}
}
public abstract static class CustomPlugin extends Plugin {
private final List<NamedWriteableRegistry.Entry> namedWritables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
public CustomPlugin() {
registerBuiltinWritables();
}
protected <T extends Metadata.Custom> void registerMetadataCustom(
final String name,
final Writeable.Reader<T> reader,
final CheckedFunction<XContentParser, T, IOException> parser
) {
namedWritables.add(new NamedWriteableRegistry.Entry(Metadata.Custom.class, name, reader));
namedXContents.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(name), parser));
}
protected abstract void registerBuiltinWritables();
protected abstract String getType();
protected abstract Custom getInstance();
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return namedWritables;
}
@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return namedXContents;
}
private final AtomicBoolean installed = new AtomicBoolean();
@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
clusterService.addListener(event -> {
final ClusterState state = event.state();
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
return;
}
final Metadata metadata = state.metadata();
if (state.nodes().isLocalNodeElectedMaster()) {
if (metadata.custom(getType()) == null) {
if (installed.compareAndSet(false, true)) {
clusterService.submitStateUpdateTask("install-metadata-custom", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.custom(getType()) == null) {
final Metadata.Builder builder = Metadata.builder(currentState.metadata());
builder.putCustom(getType(), getInstance());
return ClusterState.builder(currentState).metadata(builder).build();
} else {
return currentState;
}
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}
});
}
}
}
});
return Collections.emptyList();
}
}
public static class NodePlugin extends CustomPlugin {
public Optional<String> getFeature() {
return Optional.of("node");
}
static final int VALUE = randomInt();
@Override
protected void registerBuiltinWritables() {
registerMetadataCustom(
NodeCustom.TYPE,
NodeCustom::new,
parser -> { throw new IOException(new UnsupportedOperationException()); }
);
}
@Override
protected String getType() {
return NodeCustom.TYPE;
}
@Override
protected Custom getInstance() {
return new NodeCustom(VALUE);
}
}
public static class NodeAndTransportClientPlugin extends CustomPlugin {
@Override
protected Optional<String> getFeature() {
return Optional.of("node-and-transport-client");
}
static final int VALUE = randomInt();
@Override
protected void registerBuiltinWritables() {
registerMetadataCustom(
NodeAndTransportClientCustom.TYPE,
NodeAndTransportClientCustom::new,
parser -> { throw new IOException(new UnsupportedOperationException()); }
);
}
@Override
protected String getType() {
return NodeAndTransportClientCustom.TYPE;
}
@Override
protected Custom getInstance() {
return new NodeAndTransportClientCustom(VALUE);
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(NodePlugin.class, NodeAndTransportClientPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(NodeAndTransportClientPlugin.class);
}
public void testOptionalCustoms() throws Exception {
// ensure that the customs are injected into the cluster state
assertBusy(() -> assertTrue(clusterService().state().metadata().customs().containsKey(NodeCustom.TYPE)));
assertBusy(() -> assertTrue(clusterService().state().metadata().customs().containsKey(NodeAndTransportClientCustom.TYPE)));
final ClusterStateResponse state = internalCluster().transportClient().admin().cluster().prepareState().get();
final ImmutableOpenMap<String, Metadata.Custom> customs = state.getState().metadata().customs();
final Set<String> keys = new HashSet<>(Arrays.asList(customs.keys().toArray(String.class)));
assertThat(keys, hasItem(IndexGraveyard.TYPE));
assertThat(keys, not(hasItem(NodeCustom.TYPE)));
assertThat(keys, hasItem(NodeAndTransportClientCustom.TYPE));
final Metadata.Custom actual = customs.get(NodeAndTransportClientCustom.TYPE);
assertThat(actual, instanceOf(NodeAndTransportClientCustom.class));
assertThat(((NodeAndTransportClientCustom) actual).value(), equalTo(NodeAndTransportClientPlugin.VALUE));
}
}

View File

@ -82,7 +82,7 @@ import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class RareClusterStateIT extends OpenSearchIntegTestCase {
@Override

View File

@ -93,11 +93,6 @@ public class DiscoveryNodeRoleIT extends OpenSearchIntegTestCase {
return Collections.singletonList(AdditionalRolePlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(AdditionalRolePlugin.class);
}
public void testDefaultHasAdditionalRole() {
runTestNodeHasAdditionalRole(Settings.EMPTY);
}

View File

@ -48,7 +48,7 @@ import org.opensearch.test.transport.MockTransportService;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ClusterDisruptionCleanSettingsIT extends OpenSearchIntegTestCase {
@Override

View File

@ -95,7 +95,7 @@ import static org.hamcrest.Matchers.oneOf;
/**
* Tests various cluster operations (e.g., indexing) during disruptions.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
private enum ConflictMode {

View File

@ -57,7 +57,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHAR
/**
* Tests for discovery during disruptions.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
/**

View File

@ -62,7 +62,7 @@ import static org.hamcrest.Matchers.not;
/**
* Tests relating to the loss of the master.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MasterDisruptionIT extends AbstractDisruptionTestCase {
/**

View File

@ -71,7 +71,7 @@ import static org.hamcrest.Matchers.is;
/**
* Tests snapshot operations during disruptions.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
@Override

View File

@ -76,7 +76,7 @@ import static org.hamcrest.Matchers.equalTo;
* Tests relating to the loss of the master, but which work with the default fault detection settings which are rather lenient and will
* not detect a master failure too quickly.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class StableMasterDisruptionIT extends OpenSearchIntegTestCase {
@Override

View File

@ -107,7 +107,7 @@ public class SingleNodeDiscoveryIT extends OpenSearchIntegTestCase {
Function.identity()
)
) {
other.beforeTest(random(), 0);
other.beforeTest(random());
final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
final ClusterState second = other.getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1));
@ -174,7 +174,7 @@ public class SingleNodeDiscoveryIT extends OpenSearchIntegTestCase {
Function.identity()
)
) {
other.beforeTest(random(), 0);
other.beforeTest(random());
final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1));
assertBusy(() -> mockAppender.assertAllExpectationsMatched());

View File

@ -66,7 +66,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
public class IndexingPressureIT extends OpenSearchIntegTestCase {
public static final String INDEX_NAME = "test";

View File

@ -43,7 +43,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
public class ShardIndexingPressureIT extends OpenSearchIntegTestCase {
public static final String INDEX_NAME = "test_index";

View File

@ -43,7 +43,7 @@ import java.util.stream.Stream;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1, transportClientRatio = 0.0D)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
public class ShardIndexingPressureSettingsIT extends OpenSearchIntegTestCase {
public static final String INDEX_NAME = "test_index";

View File

@ -48,11 +48,6 @@ public class CustomQueryParserIT extends OpenSearchIntegTestCase {
return Arrays.asList(DummyQueryParserPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(DummyQueryParserPlugin.class);
}
@Override
@Before
public void setUp() throws Exception {

View File

@ -70,7 +70,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResp
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2, supportsDedicatedMasters = false, numClientNodes = 1, transportClientRatio = 0.0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2, supportsDedicatedMasters = false, numClientNodes = 1)
public class ExceptionRetryIT extends OpenSearchIntegTestCase {
@Override

View File

@ -51,11 +51,6 @@ public class InternalSettingsIT extends OpenSearchIntegTestCase {
return Collections.singleton(InternalOrPrivateSettingsPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(InternalOrPrivateSettingsPlugin.class);
}
public void testSetInternalIndexSettingOnCreate() {
final Settings settings = Settings.builder().put("index.internal", "internal").build();
createIndex("index", settings);

View File

@ -54,11 +54,6 @@ public class PrivateSettingsIT extends OpenSearchIntegTestCase {
return Collections.singletonList(InternalOrPrivateSettingsPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(InternalOrPrivateSettingsPlugin.class);
}
public void testSetPrivateIndexSettingOnCreate() {
final Settings settings = Settings.builder().put("index.private", "private").build();
final Exception e = expectThrows(Exception.class, () -> createIndex("index", settings));

View File

@ -56,11 +56,6 @@ public class PersistentTasksExecutorFullRestartIT extends OpenSearchIntegTestCas
return Collections.singletonList(TestPersistentTasksPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
protected boolean ignoreExternalCluster() {
return true;
}

View File

@ -72,11 +72,6 @@ public class PersistentTasksExecutorIT extends OpenSearchIntegTestCase {
return Collections.singletonList(TestPersistentTasksPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
protected boolean ignoreExternalCluster() {
return true;
}

View File

@ -62,11 +62,6 @@ public class EnableAssignmentDeciderIT extends OpenSearchIntegTestCase {
return singletonList(TestPersistentTasksPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@Override
protected boolean ignoreExternalCluster() {
return true;

View File

@ -51,7 +51,7 @@ import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0.0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class FullRollingRestartIT extends OpenSearchIntegTestCase {
protected void assertTimeout(ClusterHealthRequestBuilder requestBuilder) {
ClusterHealthResponse clusterHealth = requestBuilder.get();

View File

@ -74,11 +74,6 @@ public class FetchSubPhasePluginIT extends OpenSearchIntegTestCase {
return Collections.singletonList(FetchTermVectorsPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
@SuppressWarnings("unchecked")
public void testPlugin() throws Exception {
client().admin()

View File

@ -72,11 +72,6 @@ public class FunctionScorePluginIT extends OpenSearchIntegTestCase {
return Arrays.asList(CustomDistanceScorePlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(CustomDistanceScorePlugin.class);
}
public void testPlugin() throws Exception {
client().admin()
.indices()

View File

@ -477,11 +477,11 @@ public class SearchScrollIT extends OpenSearchIntegTestCase {
assertToXContentResponse(clearResponse, true, clearResponse.getNumFreed());
assertRequestBuilderThrows(
internalCluster().transportClient().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)),
internalCluster().client().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)),
RestStatus.NOT_FOUND
);
assertRequestBuilderThrows(
internalCluster().transportClient().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)),
internalCluster().client().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)),
RestStatus.NOT_FOUND
);
}
@ -530,10 +530,7 @@ public class SearchScrollIT extends OpenSearchIntegTestCase {
ClearScrollResponse clearScrollResponse = client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
assertThat(clearScrollResponse.isSucceeded(), is(true));
assertRequestBuilderThrows(
internalCluster().transportClient().prepareSearchScroll(searchResponse.getScrollId()),
RestStatus.NOT_FOUND
);
assertRequestBuilderThrows(internalCluster().client().prepareSearchScroll(searchResponse.getScrollId()), RestStatus.NOT_FOUND);
}
public void testStringSortMissingAscTerminates() throws Exception {

View File

@ -140,7 +140,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
public static class TestCustomMetadataPlugin extends Plugin {

View File

@ -98,7 +98,7 @@ public class MultiClusterRepoAccessIT extends AbstractSnapshotIntegTestCase {
),
Function.identity()
);
secondCluster.beforeTest(random(), 0);
secondCluster.beforeTest(random());
}
@After

View File

@ -51,7 +51,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasSize;
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
@Override

View File

@ -38,7 +38,6 @@ import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.hamcrest.RegexMatcher;
import java.lang.management.ManagementFactory;
@ -109,8 +108,6 @@ public class SimpleThreadPoolIT extends OpenSearchIntegTestCase {
continue;
}
String nodePrefix = "("
+ Pattern.quote(InternalTestCluster.TRANSPORT_CLIENT_PREFIX)
+ ")?("
+ Pattern.quote(OpenSearchIntegTestCase.SUITE_CLUSTER_NODE_PREFIX)
+ "|"
+ Pattern.quote(OpenSearchIntegTestCase.TEST_CLUSTER_NODE_PREFIX)

View File

@ -133,7 +133,7 @@ import static org.hamcrest.Matchers.greaterThan;
* stale or dirty, i.e., come from a stale primary or belong to a write that ends up being discarded.</li>
* </ul>
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 4, maxNumDataNodes = 6, transportClientRatio = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 4, maxNumDataNodes = 6)
public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
private static final Pattern EXTRACT_VERSION = Pattern.compile("current document has seqNo \\[(\\d+)\\] and primary term \\[(\\d+)\\]");

View File

@ -321,6 +321,7 @@ public class Node implements Closeable {
private final Collection<LifecycleComponent> pluginLifecycleComponents;
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
final NamedWriteableRegistry namedWriteableRegistry;
public Node(Environment environment) {
this(environment, Collections.emptyList(), true);
@ -963,6 +964,8 @@ public class Node implements Closeable {
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry);
this.namedWriteableRegistry = namedWriteableRegistry;
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
logger.info("initialized");

View File

@ -32,48 +32,34 @@
package org.opensearch.action.search;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.mockito.Mockito;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.MockTransportClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import static org.hamcrest.CoreMatchers.equalTo;
public class SearchRequestBuilderTests extends OpenSearchTestCase {
private static Client client;
@BeforeClass
public static void initClient() {
// this client will not be hit by any request, but it needs to be a non null proper client
// that is why we create it but we don't add any transport address to it
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
client = new MockTransportClient(settings);
}
@AfterClass
public static void closeClient() {
client.close();
client = null;
private SearchRequestBuilder createBuilder() {
OpenSearchClient client = Mockito.mock(OpenSearchClient.class);
return new SearchRequestBuilder(client, SearchAction.INSTANCE);
}
public void testEmptySourceToString() {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
SearchRequestBuilder searchRequestBuilder = createBuilder();
assertThat(searchRequestBuilder.toString(), equalTo(new SearchSourceBuilder().toString()));
}
public void testQueryBuilderQueryToString() {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
SearchRequestBuilder searchRequestBuilder = createBuilder();
searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
assertThat(searchRequestBuilder.toString(), equalTo(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).toString()));
}
public void testSearchSourceBuilderToString() {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
SearchRequestBuilder searchRequestBuilder = createBuilder();
searchRequestBuilder.setSource(new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));
assertThat(
searchRequestBuilder.toString(),
@ -82,8 +68,9 @@ public class SearchRequestBuilderTests extends OpenSearchTestCase {
}
public void testThatToStringDoesntWipeRequestSource() {
SearchRequestBuilder searchRequestBuilder = client.prepareSearch()
.setSource(new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));
SearchRequestBuilder searchRequestBuilder = createBuilder().setSource(
new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value"))
);
String preToString = searchRequestBuilder.request().toString();
assertThat(
searchRequestBuilder.toString(),

View File

@ -1,232 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.opensearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.component.Lifecycle;
import org.opensearch.common.component.LifecycleListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.BoundTransportAddress;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.transport.CloseableConnection;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.ConnectionProfile;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportMessageListener;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportStats;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
abstract class FailAndRetryMockTransport<Response extends TransportResponse> implements Transport {
private final Random random;
private final ClusterName clusterName;
private final RequestHandlers requestHandlers = new RequestHandlers();
private final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportMessageListener listener;
private boolean connectMode = true;
private final AtomicInteger connectTransportExceptions = new AtomicInteger();
private final AtomicInteger failures = new AtomicInteger();
private final AtomicInteger successes = new AtomicInteger();
private final Set<DiscoveryNode> triedNodes = new CopyOnWriteArraySet<>();
FailAndRetryMockTransport(Random random, ClusterName clusterName) {
this.random = new Random(random.nextLong());
this.clusterName = clusterName;
}
protected abstract ClusterState getMockClusterState(DiscoveryNode node);
@Override
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> connectionListener) {
connectionListener.onResponse(new CloseableConnection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
// we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
if (TransportLivenessAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
transportResponseHandler.handleResponse(new LivenessResponse(clusterName, node));
} else if (ClusterStateAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
ClusterState clusterState = getMockClusterState(node);
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState, false));
} else if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
Version version = node.getVersion();
transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, version));
} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
}
return;
}
// once nodes are connected we'll just return errors for each sendRequest call
triedNodes.add(node);
if (random.nextInt(100) > 10) {
connectTransportExceptions.incrementAndGet();
throw new ConnectTransportException(node, "node not available");
} else {
if (random.nextBoolean()) {
failures.incrementAndGet();
// throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalStateException();
} else {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
if (random.nextBoolean()) {
successes.incrementAndGet();
transportResponseHandler.handleResponse(newResponse());
} else {
failures.incrementAndGet();
transportResponseHandler.handleException(new TransportException("transport exception"));
}
}
}
}
});
}
protected abstract Response newResponse();
public void endConnectMode() {
this.connectMode = false;
}
public int connectTransportExceptions() {
return connectTransportExceptions.get();
}
public int failures() {
return failures.get();
}
public int successes() {
return successes.get();
}
public Set<DiscoveryNode> triedNodes() {
return triedNodes;
}
@Override
public BoundTransportAddress boundAddress() {
return null;
}
@Override
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
throw new UnsupportedOperationException();
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void start() {}
@Override
public void stop() {}
@Override
public void close() {}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return Collections.emptyMap();
}
@Override
public TransportStats getStats() {
throw new UnsupportedOperationException();
}
@Override
public ResponseHandlers getResponseHandlers() {
return responseHandlers;
}
@Override
public RequestHandlers getRequestHandlers() {
return requestHandlers;
}
@Override
public void setMessageListener(TransportMessageListener listener) {
this.listener = listener;
}
}

View File

@ -1,240 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.Version;
import org.opensearch.action.ActionType;
import org.opensearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.opensearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.AbstractClientHeadersTestCase;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.env.Environment;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
private MockTransportService transportService;
@Override
public void tearDown() throws Exception {
try {
// stop this first before we bubble up since
// transportService uses the threadpool that super.tearDown will close
transportService.stop();
transportService.close();
} finally {
super.tearDown();
}
}
@Override
protected Client buildClient(Settings headersSettings, ActionType[] testedActions) {
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
transportService.start();
transportService.acceptIncomingRequests();
String transport = getTestTransportType();
TransportClient client = new MockTransportClient(
Settings.builder()
.put("client.transport.sniff", false)
.put("cluster.name", "cluster1")
.put("node.name", "transport_client_" + this.getTestName())
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.put(headersSettings)
.build(),
InternalTransportServiceInterceptor.TestPlugin.class
);
InternalTransportServiceInterceptor.TestPlugin plugin = client.injector.getInstance(PluginsService.class)
.filterPlugins(InternalTransportServiceInterceptor.TestPlugin.class)
.stream()
.findFirst()
.get();
plugin.instance.threadPool = client.threadPool();
plugin.instance.address = transportService.boundAddress().publishAddress();
client.addTransportAddress(transportService.boundAddress().publishAddress());
return client;
}
public void testWithSniffing() throws Exception {
String transport = getTestTransportType();
try (
TransportClient client = new MockTransportClient(
Settings.builder()
.put("client.transport.sniff", true)
.put("cluster.name", "cluster1")
.put("node.name", "transport_client_" + this.getTestName() + "_1")
.put("client.transport.nodes_sampler_interval", "1s")
.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transport)
.put(HEADER_SETTINGS)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build(),
InternalTransportServiceInterceptor.TestPlugin.class
)
) {
InternalTransportServiceInterceptor.TestPlugin plugin = client.injector.getInstance(PluginsService.class)
.filterPlugins(InternalTransportServiceInterceptor.TestPlugin.class)
.stream()
.findFirst()
.get();
plugin.instance.threadPool = client.threadPool();
plugin.instance.address = transportService.boundAddress().publishAddress();
client.addTransportAddress(transportService.boundAddress().publishAddress());
if (!plugin.instance.clusterStateLatch.await(5, TimeUnit.SECONDS)) {
fail("takes way too long to get the cluster state");
}
assertEquals(1, client.connectedNodes().size());
assertEquals(client.connectedNodes().get(0).getAddress(), transportService.boundAddress().publishAddress());
}
}
public static class InternalTransportServiceInterceptor implements TransportInterceptor {
ThreadPool threadPool;
TransportAddress address;
public static class TestPlugin extends Plugin implements NetworkPlugin {
private InternalTransportServiceInterceptor instance = new InternalTransportServiceInterceptor();
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext
) {
return Collections.singletonList(new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler
) {
return instance.interceptHandler(action, executor, forceExecution, actualHandler);
}
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return instance.interceptSender(sender);
}
});
}
}
final CountDownLatch clusterStateLatch = new CountDownLatch(1);
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
final ClusterName clusterName = new ClusterName("cluster1");
if (TransportLivenessAction.NAME.equals(action)) {
assertHeaders(threadPool);
((TransportResponseHandler<LivenessResponse>) handler).handleResponse(
new LivenessResponse(clusterName, connection.getNode())
);
} else if (ClusterStateAction.NAME.equals(action)) {
assertHeaders(threadPool);
ClusterName cluster1 = clusterName;
ClusterState.Builder builder = ClusterState.builder(cluster1);
// the sniffer detects only data nodes
builder.nodes(
DiscoveryNodes.builder()
.add(
new DiscoveryNode(
"node_id",
"someId",
"some_ephemeralId_id",
address.address().getHostString(),
address.getAddress(),
address,
Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE),
Version.CURRENT
)
)
);
((TransportResponseHandler<ClusterStateResponse>) handler).handleResponse(
new ClusterStateResponse(cluster1, builder.build(), false)
);
clusterStateLatch.countDown();
} else if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
((TransportResponseHandler<TransportService.HandshakeResponse>) handler).handleResponse(
new TransportService.HandshakeResponse(connection.getNode(), clusterName, connection.getNode().getVersion())
);
} else {
handler.handleException(new TransportException("", new InternalException(action)));
}
}
};
}
}
}

View File

@ -1,497 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.opensearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.node.Node;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.hamcrest.CustomMatcher;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.opensearch.test.transport.MockTransportService.createNewService;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
public class TransportClientNodesServiceTests extends OpenSearchTestCase {
private static class TestIteration implements Closeable {
private final ThreadPool threadPool;
private final FailAndRetryMockTransport<TestResponse> transport;
private final MockTransportService transportService;
private final TransportClientNodesService transportClientNodesService;
private final int listNodesCount;
private final int sniffNodesCount;
private TransportAddress livenessAddress = buildNewFakeTransportAddress();
final List<TransportAddress> listNodeAddresses;
// map for each address of the nodes a cluster state request should respond with
final Map<TransportAddress, DiscoveryNodes> nodeMap;
TestIteration() {
this(Settings.EMPTY);
}
TestIteration(Settings extraSettings) {
Settings settings = Settings.builder().put(extraSettings).put("cluster.name", "test").build();
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
List<TransportAddress> listNodes = new ArrayList<>();
Map<TransportAddress, DiscoveryNodes> nodeMap = new HashMap<>();
this.listNodesCount = randomIntBetween(1, 10);
int sniffNodesCount = 0;
for (int i = 0; i < listNodesCount; i++) {
TransportAddress transportAddress = buildNewFakeTransportAddress();
listNodes.add(transportAddress);
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
discoNodes.add(new DiscoveryNode("#list-node#-" + transportAddress, transportAddress, Version.CURRENT));
if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings)) {
final int numSniffNodes = randomIntBetween(0, 3);
for (int j = 0; j < numSniffNodes; ++j) {
TransportAddress sniffAddress = buildNewFakeTransportAddress();
DiscoveryNode sniffNode = new DiscoveryNode("#sniff-node#-" + sniffAddress, sniffAddress, Version.CURRENT);
discoNodes.add(sniffNode);
// also allow sniffing of the sniff node itself
nodeMap.put(sniffAddress, DiscoveryNodes.builder().add(sniffNode).build());
++sniffNodesCount;
}
}
nodeMap.put(transportAddress, discoNodes.build());
}
listNodeAddresses = listNodes;
this.nodeMap = nodeMap;
this.sniffNodesCount = sniffNodesCount;
threadPool = new TestThreadPool("transport-client-nodes-service-tests");
transport = new FailAndRetryMockTransport<TestResponse>(random(), clusterName) {
@Override
public List<String> getDefaultSeedAddresses() {
return Collections.emptyList();
}
@Override
protected TestResponse newResponse() {
return new TestResponse();
}
@Override
protected ClusterState getMockClusterState(DiscoveryNode node) {
return ClusterState.builder(clusterName).nodes(TestIteration.this.nodeMap.get(node.getAddress())).build();
}
};
transportService = new MockTransportService(settings, transport, threadPool, new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
if (TransportLivenessAction.NAME.equals(action)) {
sender.sendRequest(
connection,
action,
request,
options,
wrapLivenessResponseHandler(handler, connection.getNode(), clusterName)
);
} else {
sender.sendRequest(connection, action, request, options, handler);
}
}
};
}
}, (addr) -> {
assert addr == null : "boundAddress: " + addr;
return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID());
}, null, Collections.emptySet());
transportService.addNodeConnectedBehavior((cm, dn) -> false);
transportService.addGetConnectionBehavior((connectionManager, discoveryNode) -> {
// The FailAndRetryTransport does not use the connection profile
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
transport.openConnection(discoveryNode, null, future);
return future.actionGet();
});
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService = new TransportClientNodesService(settings, transportService, threadPool, (a, b) -> {});
transportClientNodesService.addTransportAddresses(listNodeAddresses.toArray(new TransportAddress[0]));
}
private <T extends TransportResponse> TransportResponseHandler wrapLivenessResponseHandler(
TransportResponseHandler<T> handler,
DiscoveryNode node,
ClusterName clusterName
) {
return new TransportResponseHandler<T>() {
@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
@SuppressWarnings("unchecked")
public void handleResponse(T response) {
LivenessResponse livenessResponse = new LivenessResponse(
clusterName,
new DiscoveryNode(
node.getName(),
node.getId(),
node.getEphemeralId(),
"liveness-hostname" + node.getId(),
"liveness-hostaddress" + node.getId(),
livenessAddress,
node.getAttributes(),
node.getRoles(),
node.getVersion()
)
);
handler.handleResponse((T) livenessResponse);
}
@Override
public void handleException(TransportException exp) {
handler.handleException(exp);
}
@Override
public String executor() {
return handler.executor();
}
};
}
@Override
public void close() {
transport.endConnectMode();
transportService.stop();
transportClientNodesService.close();
terminate(threadPool);
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37567")
public void testListenerFailures() throws InterruptedException {
int iters = iterations(10, 100);
for (int i = 0; i < iters; i++) {
try (TestIteration iteration = new TestIteration()) {
iteration.transport.endConnectMode(); // stop transport from responding early
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger finalFailures = new AtomicInteger();
final AtomicReference<Throwable> finalFailure = new AtomicReference<>();
final AtomicReference<TestResponse> response = new AtomicReference<>();
ActionListener<TestResponse> actionListener = new ActionListener<TestResponse>() {
@Override
public void onResponse(TestResponse testResponse) {
response.set(testResponse);
latch.countDown();
}
@Override
public void onFailure(Exception e) {
finalFailures.incrementAndGet();
finalFailure.set(e);
latch.countDown();
}
};
final AtomicInteger preSendFailures = new AtomicInteger();
iteration.transportClientNodesService.execute((node, retryListener) -> {
if (rarely()) {
preSendFailures.incrementAndGet();
// throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalArgumentException();
}
iteration.transportService.sendRequest(
node,
"action",
new TestRequest(),
TransportRequestOptions.EMPTY,
new TransportResponseHandler<TestResponse>() {
@Override
public TestResponse read(StreamInput in) {
return new TestResponse(in);
}
@Override
public void handleResponse(TestResponse response1) {
retryListener.onResponse(response1);
}
@Override
public void handleException(TransportException exp) {
retryListener.onFailure(exp);
}
@Override
public String executor() {
return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
}
}
);
}, actionListener);
latch.await();
// there can be only either one failure that causes the request to fail straightaway or success
assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1));
if (iteration.transport.successes() == 1) {
assertThat(finalFailures.get(), equalTo(0));
assertThat(finalFailure.get(), nullValue());
assertThat(response.get(), notNullValue());
} else {
assertThat(finalFailures.get(), equalTo(1));
assertThat(finalFailure.get(), notNullValue());
assertThat(response.get(), nullValue());
if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) {
assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class));
}
}
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount));
assertThat(
iteration.transport.triedNodes().size(),
equalTo(
iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes()
)
);
}
}
}
public void testConnectedNodes() {
int iters = iterations(10, 100);
for (int i = 0; i < iters; i++) {
try (TestIteration iteration = new TestIteration()) {
assertThat(iteration.transportClientNodesService.connectedNodes().size(), lessThanOrEqualTo(iteration.listNodesCount));
for (DiscoveryNode discoveryNode : iteration.transportClientNodesService.connectedNodes()) {
assertThat(discoveryNode.getHostName(), startsWith("liveness-"));
assertThat(discoveryNode.getHostAddress(), startsWith("liveness-"));
assertNotEquals(discoveryNode.getAddress(), iteration.livenessAddress);
assertThat(iteration.listNodeAddresses, hasItem(discoveryNode.getAddress()));
}
}
}
}
public void testRemoveAddressSniff() {
checkRemoveAddress(true);
}
public void testRemoveAddressSimple() {
checkRemoveAddress(false);
}
private void checkRemoveAddress(boolean sniff) {
Settings extraSettings = Settings.builder().put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), sniff).build();
try (TestIteration iteration = new TestIteration(extraSettings)) {
final TransportClientNodesService service = iteration.transportClientNodesService;
assertEquals(iteration.listNodesCount + iteration.sniffNodesCount, service.connectedNodes().size());
final TransportAddress addressToRemove = randomFrom(iteration.listNodeAddresses);
service.removeTransportAddress(addressToRemove);
assertThat(service.connectedNodes(), everyItem(not(new CustomMatcher<DiscoveryNode>("removed address") {
@Override
public boolean matches(Object item) {
return item instanceof DiscoveryNode && ((DiscoveryNode) item).getAddress().equals(addressToRemove);
}
})));
assertEquals(iteration.listNodesCount + iteration.sniffNodesCount - 1, service.connectedNodes().size());
}
}
public void testSniffNodesSamplerClosesConnections() throws Exception {
final TestThreadPool threadPool = new TestThreadPool("testSniffNodesSamplerClosesConnections");
Settings remoteSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "remote").build();
try (MockTransportService remoteService = createNewService(remoteSettings, Version.CURRENT, threadPool, null)) {
final MockHandler handler = new MockHandler(remoteService);
remoteService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, handler);
remoteService.start();
remoteService.acceptIncomingRequests();
Settings clientSettings = Settings.builder()
.put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true)
.put(TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.getKey(), TimeValue.timeValueSeconds(1))
.put(TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.getKey(), TimeValue.timeValueSeconds(30))
.build();
try (MockTransportService clientService = createNewService(clientSettings, Version.CURRENT, threadPool, null)) {
final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();
clientService.addConnectBehavior(
remoteService,
(transport, discoveryNode, profile, listener) -> transport.openConnection(
discoveryNode,
profile,
ActionListener.delegateFailure(listener, (delegatedListener, connection) -> {
establishedConnections.add(connection);
delegatedListener.onResponse(connection);
})
)
);
clientService.start();
clientService.acceptIncomingRequests();
try (
TransportClientNodesService transportClientNodesService = new TransportClientNodesService(
clientSettings,
clientService,
threadPool,
(a, b) -> {}
)
) {
assertEquals(0, transportClientNodesService.connectedNodes().size());
assertEquals(0, establishedConnections.size());
transportClientNodesService.addTransportAddresses(remoteService.getLocalDiscoNode().getAddress());
assertEquals(1, transportClientNodesService.connectedNodes().size());
assertEquals(1, clientService.connectionManager().size());
transportClientNodesService.doSample();
assertEquals(1, clientService.connectionManager().size());
establishedConnections.clear();
handler.failToRespond();
Thread thread = new Thread(transportClientNodesService::doSample);
thread.start();
assertBusy(() -> assertTrue(establishedConnections.size() >= 1));
assertFalse("Temporary ping connection must be opened", establishedConnections.get(0).isClosed());
thread.join();
assertTrue(establishedConnections.get(0).isClosed());
}
}
} finally {
terminate(threadPool);
}
}
class MockHandler implements TransportRequestHandler<ClusterStateRequest> {
private final AtomicBoolean failToRespond = new AtomicBoolean(false);
private final MockTransportService transportService;
MockHandler(MockTransportService transportService) {
this.transportService = transportService;
}
@Override
public void messageReceived(ClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
if (failToRespond.get()) {
return;
}
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(transportService.getLocalDiscoNode()).build();
ClusterState build = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
channel.sendResponse(new ClusterStateResponse(ClusterName.DEFAULT, build, false));
}
void failToRespond() {
if (failToRespond.compareAndSet(false, true) == false) {
throw new AssertionError("Request handler is already marked as failToRespond");
}
}
}
public static class TestRequest extends TransportRequest {
}
private static class TestResponse extends TransportResponse {
private TestResponse() {}
private TestResponse(StreamInput in) {}
@Override
public void writeTo(StreamOutput out) throws IOException {}
}
}

View File

@ -1,128 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.client.transport;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.common.io.stream.NamedWriteable;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.env.Environment;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.TransportSettings;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.object.HasToString.hasToString;
public class TransportClientTests extends OpenSearchTestCase {
public void testThatUsingAClosedClientThrowsAnException() throws ExecutionException, InterruptedException {
final TransportClient client = new MockTransportClient(Settings.EMPTY);
client.close();
final IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> client.admin().cluster().health(new ClusterHealthRequest()).get()
);
assertThat(e, hasToString(containsString("transport client is closed")));
}
/**
* test that when plugins are provided that want to register
* {@link NamedWriteable}, those are also made known to the
* {@link NamedWriteableRegistry} of the transport client
*/
public void testPluginNamedWriteablesRegistered() {
Settings baseSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
assertNotNull(client.namedWriteableRegistry.getReader(MockPlugin.MockNamedWriteable.class, MockPlugin.MockNamedWriteable.NAME));
}
}
public void testSettingsContainsTransportClient() {
final Settings baseSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
final Settings settings = TransportSettings.DEFAULT_FEATURES_SETTING.get(client.settings());
assertThat(settings.keySet(), hasItem("transport_client"));
assertThat(settings.get("transport_client"), equalTo("true"));
}
}
public void testDefaultHeader() {
final Settings baseSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
assertEquals("true", threadContext.getHeader("test"));
}
}
public static class MockPlugin extends Plugin {
@Override
public List<Entry> getNamedWriteables() {
return Arrays.asList(new Entry[] { new Entry(MockNamedWriteable.class, MockNamedWriteable.NAME, MockNamedWriteable::new) });
}
@Override
public Settings additionalSettings() {
return Settings.builder().put(ThreadContext.PREFIX + "." + "test", true).build();
}
public class MockNamedWriteable implements NamedWriteable {
static final String NAME = "mockNamedWritable";
MockNamedWriteable(StreamInput in) {}
@Override
public void writeTo(StreamOutput out) throws IOException {}
@Override
public String getWriteableName() {
return NAME;
}
}
}
}

View File

@ -33,7 +33,6 @@
package org.opensearch.cluster;
import org.opensearch.Version;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.ClusterState.FeatureAware;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.io.stream.BytesStreamOutput;
@ -46,7 +45,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Optional;
import static org.opensearch.test.VersionUtils.randomVersionBetween;
@ -151,41 +149,21 @@ public class FeatureAwareTests extends OpenSearchTestCase {
final Version version = VersionUtils.randomVersion(random());
final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
final Custom custom = new RequiredFeatureCustom(version);
{
// the feature is present and the client is not a transport client
// the feature is present
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(custom.getRequiredFeature().isPresent());
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
assertTrue(FeatureAware.shouldSerialize(out, custom));
}
{
// the feature is present and the client is a transport client
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(custom.getRequiredFeature().isPresent());
out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE)));
assertTrue(FeatureAware.shouldSerialize(out, custom));
}
}
public void testMissingFeature() {
final Version version = VersionUtils.randomVersion(random());
final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
final Custom custom = new RequiredFeatureCustom(version);
{
// the feature is missing but we should serialize it anyway because the client is not a transport client
// the feature is missing
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(FeatureAware.shouldSerialize(out, custom));
}
{
// the feature is missing and we should not serialize it because the client is a transport client
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE));
assertFalse(FeatureAware.shouldSerialize(out, custom));
}
}
}

View File

@ -33,7 +33,6 @@ package org.opensearch.persistent;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.Version;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
@ -80,7 +79,6 @@ import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_SNAPSHOT;
import static org.opensearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
import static org.opensearch.test.VersionUtils.allReleasedVersions;
import static org.opensearch.test.VersionUtils.compatibleFutureVersion;
import static org.opensearch.test.VersionUtils.getFirstVersion;
import static org.opensearch.test.VersionUtils.getPreviousVersion;
import static org.opensearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.equalTo;
@ -307,12 +305,7 @@ public class PersistentTasksCustomMetadataTests extends AbstractDiffableSerializ
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(streamVersion);
Set<String> features = new HashSet<>();
final boolean transportClient = randomBoolean();
if (transportClient) {
features.add(TransportClient.TRANSPORT_CLIENT_FEATURE);
}
// if a transport client, then it must have the feature otherwise we add the feature randomly
if (transportClient || randomBoolean()) {
if (randomBoolean()) {
features.add("test");
}
out.setFeatures(features);
@ -327,40 +320,6 @@ public class PersistentTasksCustomMetadataTests extends AbstractDiffableSerializ
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible_version")));
}
public void testFeatureSerialization() throws IOException {
PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder();
Version minVersion = getFirstVersion();
tasks.addTask(
"test_compatible",
TestPersistentTasksExecutor.NAME,
new TestParams(
null,
randomVersionBetween(random(), minVersion, Version.CURRENT),
randomBoolean() ? Optional.empty() : Optional.of("existing")
),
randomAssignment()
);
tasks.addTask(
"test_incompatible",
TestPersistentTasksExecutor.NAME,
new TestParams(null, randomVersionBetween(random(), minVersion, Version.CURRENT), Optional.of("non_existing")),
randomAssignment()
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(Version.CURRENT);
Set<String> features = new HashSet<>();
features.add("existing");
features.add(TransportClient.TRANSPORT_CLIENT_FEATURE);
out.setFeatures(features);
tasks.build().writeTo(out);
PersistentTasksCustomMetadata read = new PersistentTasksCustomMetadata(
new NamedWriteableAwareStreamInput(out.bytes().streamInput(), getNamedWriteableRegistry())
);
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible")));
}
public void testDisassociateDeadNodes_givenNoPersistentTasks() {
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")).build();
ClusterState returnedState = PersistentTasksCustomMetadata.disassociateDeadNodes(originalState);

View File

@ -37,6 +37,7 @@ import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
@ -239,4 +240,8 @@ public class MockNode extends Node {
protected void configureNodeAndClusterIdStateListener(ClusterService clusterService) {
// do not configure this in tests as this is causing SetOnce to throw exceptions when jvm is used for multiple tests
}
public NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}
}

View File

@ -135,7 +135,7 @@ public abstract class AbstractMultiClustersTestCase extends OpenSearchTestCase {
mockPlugins,
Function.identity()
);
cluster.beforeTest(random(), 0);
cluster.beforeTest(random());
clusters.put(clusterAlias, cluster);
}
clusterGroup = new ClusterGroup(clusters);

View File

@ -34,6 +34,7 @@ package org.opensearch.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
@ -47,18 +48,20 @@ import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.env.Environment;
import org.opensearch.http.HttpInfo;
import org.opensearch.node.MockNode;
import org.opensearch.plugins.Plugin;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.TransportSettings;
import org.opensearch.transport.nio.MockNioTransportPlugin;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.HTTP;
import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS;
@ -79,7 +82,8 @@ public final class ExternalTestCluster extends TestCluster {
private static final AtomicInteger counter = new AtomicInteger();
public static final String EXTERNAL_CLUSTER_PREFIX = "external_";
private final MockTransportClient client;
private final MockNode node;
private final Client client;
private final InetSocketAddress[] httpAddresses;
@ -91,16 +95,27 @@ public final class ExternalTestCluster extends TestCluster {
public ExternalTestCluster(
Path tempDir,
Settings additionalSettings,
Function<Client, Client> clientWrapper,
String clusterName,
Collection<Class<? extends Plugin>> pluginClasses,
TransportAddress... transportAddresses
) {
super(0);
this.clusterName = clusterName;
Settings.Builder clientSettingsBuilder = Settings.builder()
.put(additionalSettings)
.put("node.name", InternalTestCluster.TRANSPORT_CLIENT_PREFIX + EXTERNAL_CLUSTER_PREFIX + counter.getAndIncrement())
.put("client.transport.ignore_cluster_name", true)
.put(TransportSettings.PORT.getKey(), OpenSearchTestCase.getPortRange())
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir);
.put("node.master", false)
.put("node.data", false)
.put("node.ingest", false)
.put("node.name", EXTERNAL_CLUSTER_PREFIX + counter.getAndIncrement())
.put("cluster.name", clusterName)
.putList(
"discovery.seed_hosts",
Arrays.stream(transportAddresses).map(TransportAddress::toString).collect(Collectors.toList())
);
if (Environment.PATH_HOME_SETTING.exists(additionalSettings) == false) {
clientSettingsBuilder.put(Environment.PATH_HOME_SETTING.getKey(), tempDir);
}
boolean addMockTcpTransport = additionalSettings.get(NetworkModule.TRANSPORT_TYPE_KEY) == null;
if (addMockTcpTransport) {
@ -111,10 +126,13 @@ public final class ExternalTestCluster extends TestCluster {
pluginClasses.add(MockNioTransportPlugin.class);
}
}
pluginClasses = new ArrayList<>(pluginClasses);
pluginClasses.add(MockHttpTransport.TestPlugin.class);
Settings clientSettings = clientSettingsBuilder.build();
MockTransportClient client = new MockTransportClient(clientSettings, pluginClasses);
MockNode node = new MockNode(clientSettings, pluginClasses);
Client client = clientWrapper.apply(node.client());
try {
client.addTransportAddresses(transportAddresses);
node.start();
NodesInfoResponse nodeInfos = client.admin()
.cluster()
.prepareNodesInfo()
@ -122,7 +140,6 @@ public final class ExternalTestCluster extends TestCluster {
.addMetrics(SETTINGS.metricName(), HTTP.metricName())
.get();
httpAddresses = new InetSocketAddress[nodeInfos.getNodes().size()];
this.clusterName = nodeInfos.getClusterName().value();
int dataNodes = 0;
int masterAndDataNodes = 0;
for (int i = 0; i < nodeInfos.getNodes().size(); i++) {
@ -138,11 +155,17 @@ public final class ExternalTestCluster extends TestCluster {
this.numDataNodes = dataNodes;
this.numMasterAndDataNodes = masterAndDataNodes;
this.client = client;
this.node = node;
logger.info("Setup ExternalTestCluster [{}] made of [{}] nodes", nodeInfos.getClusterName().value(), size());
} catch (Exception e) {
try {
client.close();
throw e;
node.close();
} catch (IOException e1) {
e.addSuppressed(e1);
}
throw new OpenSearchException(e);
}
}
@ -179,6 +202,7 @@ public final class ExternalTestCluster extends TestCluster {
@Override
public void close() throws IOException {
client.close();
node.close();
}
@Override
@ -236,7 +260,7 @@ public final class ExternalTestCluster extends TestCluster {
@Override
public NamedWriteableRegistry getNamedWriteableRegistry() {
return client.getNamedWriteableRegistry();
return node.getNamedWriteableRegistry();
}
@Override

View File

@ -51,7 +51,6 @@ import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.client.Client;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
@ -76,12 +75,10 @@ import org.opensearch.common.component.LifecycleListener;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.MockSecureSettings;
import org.opensearch.common.settings.SecureSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Settings.Builder;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
@ -116,7 +113,6 @@ import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.node.MockNode;
import org.opensearch.node.Node;
import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.node.NodeRoleSettings;
import org.opensearch.node.NodeService;
import org.opensearch.node.NodeValidationException;
import org.opensearch.plugins.Plugin;
@ -126,7 +122,6 @@ import org.opensearch.search.SearchService;
import org.opensearch.tasks.TaskManager;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.MockTransportClient;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;
@ -171,7 +166,6 @@ import static org.opensearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
import static org.opensearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
import static org.opensearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.OpenSearchTestCase.getTestTransportType;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
import static org.opensearch.test.NodeRoles.dataOnlyNode;
import static org.opensearch.test.NodeRoles.masterOnlyNode;
@ -194,7 +188,7 @@ import static org.junit.Assert.fail;
* The cluster supports randomized configuration such that nodes started in the cluster will
* automatically load asserting services tracking resources like file handles or open searchers.
* <p>
* The Cluster is bound to a test lifecycle where tests must call {@link #beforeTest(java.util.Random, double)} and
* The Cluster is bound to a test lifecycle where tests must call {@link #beforeTest(java.util.Random)} and
* {@link #afterTest()} to initialize and reset the cluster in order to be more reproducible. The term "more" relates
* to the async nature of OpenSearch in combination with randomized testing. Once Threads and asynchronous calls
* are involved reproducibility is very limited. This class should only be used through {@link OpenSearchIntegTestCase}.
@ -813,7 +807,7 @@ public final class InternalTestCluster extends TestCluster {
public synchronized Client client() {
ensureOpen();
/* Randomly return a client to one of the nodes in the cluster */
return getOrBuildRandomNode().client(random);
return getOrBuildRandomNode().client();
}
/**
@ -822,7 +816,7 @@ public final class InternalTestCluster extends TestCluster {
*/
public Client dataNodeClient() {
/* Randomly return a client to one of the nodes in the cluster */
return getRandomNodeAndClient(DATA_NODE_PREDICATE).client(random);
return getRandomNodeAndClient(DATA_NODE_PREDICATE).client();
}
/**
@ -855,12 +849,12 @@ public final class InternalTestCluster extends TestCluster {
ensureOpen();
NodeAndClient randomNodeAndClient = getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE);
if (randomNodeAndClient != null) {
return randomNodeAndClient.client(random);
return randomNodeAndClient.client();
}
int nodeId = nextNodeId.getAndIncrement();
Settings settings = getSettings(nodeId, random.nextLong(), Settings.EMPTY);
startCoordinatingOnlyNode(settings);
return getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE).client(random);
return getRandomNodeAndClient(NO_DATA_NO_MASTER_PREDICATE).client();
}
public synchronized String startCoordinatingOnlyNode(Settings settings) {
@ -868,21 +862,13 @@ public final class InternalTestCluster extends TestCluster {
return startNode(noRoles(settings));
}
/**
* Returns a transport client
*/
public synchronized Client transportClient() {
// randomly return a transport client going to one of the nodes in the cluster
return getOrBuildRandomNode().transportClient();
}
/**
* Returns a node client to a given node.
*/
public Client client(String nodeName) {
NodeAndClient nodeAndClient = nodes.get(nodeName);
if (nodeAndClient != null) {
return nodeAndClient.client(random);
return nodeAndClient.client();
}
throw new AssertionError("No node found with name: [" + nodeName + "]");
}
@ -920,7 +906,6 @@ public final class InternalTestCluster extends TestCluster {
private MockNode node;
private final Settings originalNodeSettings;
private Client nodeClient;
private Client transportClient;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String name;
private final int nodeAndClientId;
@ -952,17 +937,9 @@ public final class InternalTestCluster extends TestCluster {
return DiscoveryNode.isMasterNode(node.settings());
}
Client client(Random random) {
double nextDouble = random.nextDouble();
if (nextDouble < transportClientRatio) {
if (logger.isTraceEnabled()) {
logger.trace("Using transport client for node [{}] sniff: [{}]", node.settings().get("node.name"), false);
}
return getOrBuildTransportClient();
} else {
Client client() {
return getOrBuildNodeClient();
}
}
Client nodeClient() {
if (closed.get()) {
@ -971,13 +948,6 @@ public final class InternalTestCluster extends TestCluster {
return getOrBuildNodeClient();
}
Client transportClient() {
if (closed.get()) {
throw new RuntimeException("already closed");
}
return getOrBuildTransportClient();
}
private Client getOrBuildNodeClient() {
synchronized (InternalTestCluster.this) {
if (closed.get()) {
@ -990,31 +960,10 @@ public final class InternalTestCluster extends TestCluster {
}
}
private Client getOrBuildTransportClient() {
synchronized (InternalTestCluster.this) {
if (closed.get()) {
throw new RuntimeException("already closed");
}
if (transportClient == null) {
/* don't sniff client for now - doesn't work will all tests
* since it might throw NoNodeAvailableException if nodes are
* shut down. we first need support of transportClientRatio
* as annotations or so */
transportClient = new TransportClientFactory(
nodeConfigurationSource.transportClientSettings(),
baseDir,
nodeConfigurationSource.transportClientPlugins()
).client(node, clusterName);
}
return clientWrapper.apply(transportClient);
}
}
void resetClient() {
if (closed.get() == false) {
Releasables.close(nodeClient, transportClient);
Releasables.close(nodeClient);
nodeClient = null;
transportClient = null;
}
}
@ -1123,53 +1072,9 @@ public final class InternalTestCluster extends TestCluster {
}
}
public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_";
private static class TransportClientFactory {
private final Settings settings;
private final Path baseDir;
private final Collection<Class<? extends Plugin>> plugins;
TransportClientFactory(Settings settings, Path baseDir, Collection<Class<? extends Plugin>> plugins) {
this.settings = settings != null ? settings : Settings.EMPTY;
this.baseDir = baseDir;
this.plugins = plugins;
}
public Client client(Node node, String clusterName) {
TransportAddress addr = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
Settings nodeSettings = node.settings();
Builder builder = Settings.builder()
.put("client.transport.nodes_sampler_interval", "1s")
.put(Environment.PATH_HOME_SETTING.getKey(), baseDir)
.put("node.name", TRANSPORT_CLIENT_PREFIX + node.settings().get("node.name"))
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName)
.put("client.transport.sniff", false)
.put("logger.prefix", nodeSettings.get("logger.prefix", ""))
.put("logger.level", nodeSettings.get("logger.level", "INFO"))
.put(settings);
if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) {
String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings);
builder.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), transportType);
} else {
builder.put(NetworkModule.TRANSPORT_TYPE_SETTING.getKey(), getTestTransportType());
}
/*
* The node.roles setting does not make sense for the transport client, filter it. If the transport client were not deprecated
* we would probably want to invest in infrastructure to mark a setting as not applicable to the transport client and then
* filter all such settings here.
*/
final Settings finalSettings = builder.build().filter(k -> k.equals(NodeRoleSettings.NODE_ROLES_SETTING.getKey()) == false);
TransportClient client = new MockTransportClient(finalSettings, plugins);
client.addTransportAddress(addr);
return client;
}
}
@Override
public synchronized void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException {
super.beforeTest(random, transportClientRatio);
public synchronized void beforeTest(Random random) throws IOException, InterruptedException {
super.beforeTest(random);
reset(true);
}
@ -2029,7 +1934,7 @@ public final class InternalTestCluster extends TestCluster {
if (excludedNodeIds.isEmpty() == false) {
logger.info("removing voting config exclusions for {} after restart/shutdown", excludedNodeIds);
try {
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client(random);
Client client = getRandomNodeAndClient(node -> excludedNodeIds.contains(node.name) == false).client();
client.execute(ClearVotingConfigExclusionsAction.INSTANCE, new ClearVotingConfigExclusionsRequest()).get();
} catch (InterruptedException | ExecutionException e) {
throw new AssertionError("unexpected", e);
@ -2453,7 +2358,7 @@ public final class InternalTestCluster extends TestCluster {
@Override
public Client next() {
return iterator.next().client(random);
return iterator.next().client();
}
@Override

View File

@ -73,9 +73,4 @@ public abstract class NodeConfigurationSource {
return Settings.EMPTY;
}
/** Returns plugins that should be loaded in the transport client */
public Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.emptyList();
}
}

View File

@ -77,7 +77,6 @@ import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.Requests;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.RestoreInProgress;
@ -260,8 +259,6 @@ import static org.hamcrest.Matchers.startsWith;
* <p>
* This class supports the following system properties (passed with -Dkey=value to the application)
* <ul>
* <li>-D{@value #TESTS_CLIENT_RATIO} - a double value in the interval [0..1] which defines the ration between node
* and transport clients used</li>
* <li>-D{@value #TESTS_ENABLE_MOCK_MODULES} - a boolean value to enable or disable mock modules. This is
* useful to test the system without asserting modules that to make sure they don't hide any bugs in production.</li>
* <li> - a random seed used to initialize the index random context.
@ -294,11 +291,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
public static final String SUITE_CLUSTER_NODE_PREFIX = "node_s";
public static final String TEST_CLUSTER_NODE_PREFIX = "node_t";
/**
* Key used to set the transport client ratio via the commandline -D{@value #TESTS_CLIENT_RATIO}
*/
public static final String TESTS_CLIENT_RATIO = "tests.client.ratio";
/**
* Key used to eventually switch to using an external cluster and provide its transport addresses
*/
@ -356,6 +348,11 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
*/
protected static final int DEFAULT_MAX_NUM_SHARDS = 10;
/**
* Key to provide the cluster name
*/
public static final String TESTS_CLUSTER_NAME = "tests.clustername";
/**
* The current cluster depending on the configured {@link Scope}.
* By default if no {@link ClusterScope} is configured this will hold a reference to the suite cluster.
@ -363,8 +360,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
private static TestCluster currentCluster;
private static RestClient restClient = null;
private static final double TRANSPORT_CLIENT_RATIO = transportClientRatio();
private static final Map<Class<?>, TestCluster> clusters = new IdentityHashMap<>();
private static OpenSearchIntegTestCase INSTANCE = null; // see @SuiteScope
@ -386,7 +381,7 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
protected final void beforeInternal() throws Exception {
final Scope currentClusterScope = getCurrentClusterScope();
Callable<Void> setup = () -> {
cluster().beforeTest(random(), getPerTestTransportClientRatio());
cluster().beforeTest(random());
cluster().wipe(excludeTemplates());
randomIndexTemplate();
return null;
@ -1124,13 +1119,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
&& masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("cluster state UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
/*
* The cluster state received by the transport client can miss customs that the client does not understand. This
* means that we only expect equality in the cluster state including customs if the master client and the local
* client are of the same type (both or neither are transport clients). Otherwise, we can only assert equality
* modulo non-core customs.
*/
if (isTransportClient(masterClient) == isTransportClient(client)) {
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize);
@ -1139,16 +1127,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
"cluster state JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)
);
} else {
// remove non-core customs and compare the cluster states
assertNull(
"cluster state JSON serialization does not match (after removing some customs)",
differenceBetweenMapsIgnoringArrayOrder(
convertToMap(removePluginCustoms(masterClusterState)),
convertToMap(removePluginCustoms(localClusterState))
)
);
}
} catch (final AssertionError error) {
logger.error(
"Cluster state from master:\n{}\nLocal cluster state:\n{}",
@ -1263,21 +1241,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
}
}
/**
* Tests if the client is a transport client or wraps a transport client.
*
* @param client the client to test
* @return true if the client is a transport client or a wrapped transport client
*/
private boolean isTransportClient(final Client client) {
if (TransportClient.class.isAssignableFrom(client.getClass())) {
return true;
} else if (client instanceof RandomizingClient) {
return isTransportClient(((RandomizingClient) client).in());
}
return false;
}
private static final Set<String> SAFE_METADATA_CUSTOMS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetadata.TYPE, ScriptMetadata.TYPE))
);
@ -1501,8 +1464,7 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
}
/**
* Returns a random admin client. This client can either be a node or a transport client pointing to any of
* the nodes in the cluster.
* Returns a random admin client. This client can be a node pointing to any of the nodes in the cluster.
*/
protected AdminClient admin() {
return client().admin();
@ -1808,12 +1770,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
* negative value means that the number of client nodes will be randomized.
*/
int numClientNodes() default InternalTestCluster.DEFAULT_NUM_CLIENT_NODES;
/**
* Returns the transport client ratio. By default this returns <code>-1</code> which means a random
* ratio in the interval <code>[0..1]</code> is used.
*/
double transportClientRatio() default -1;
}
private class LatchedActionListener<Response> implements ActionListener<Response> {
@ -1966,24 +1922,7 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
return Collections.emptyList();
}
/**
* Returns a collection of plugins that should be loaded when creating a transport client.
*/
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.emptyList();
}
/**
* This method is used to obtain additional settings for clients created by the internal cluster.
* These settings will be applied on the client in addition to some randomized settings defined in
* the cluster. These settings will also override any other settings the internal cluster might
* add by default.
*/
protected Settings transportClientSettings() {
return Settings.EMPTY;
}
private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws IOException {
private ExternalTestCluster buildExternalCluster(String clusterAddresses, String clusterName) throws IOException {
String[] stringAddresses = clusterAddresses.split(",");
TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length];
int i = 0;
@ -1992,7 +1931,14 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
InetAddress inetAddress = InetAddress.getByName(url.getHost());
transportAddresses[i++] = new TransportAddress(new InetSocketAddress(inetAddress, url.getPort()));
}
return new ExternalTestCluster(createTempDir(), externalClusterClientSettings(), transportClientPlugins(), transportAddresses);
return new ExternalTestCluster(
createTempDir(),
externalClusterClientSettings(),
getClientWrapper(),
clusterName,
nodePlugins(),
transportAddresses
);
}
protected Settings externalClusterClientSettings() {
@ -2009,7 +1955,11 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
if (scope == Scope.TEST) {
throw new IllegalArgumentException("Cannot run TEST scope test with " + TESTS_CLUSTER);
}
return buildExternalCluster(clusterAddresses);
String clusterName = System.getProperty(TESTS_CLUSTER_NAME);
if (Strings.isNullOrEmpty(clusterName)) {
throw new IllegalArgumentException("Missing tests.clustername system property");
}
return buildExternalCluster(clusterAddresses, clusterName);
}
final String nodePrefix;
@ -2064,10 +2014,8 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
private NodeConfigurationSource getNodeConfigSource() {
Settings.Builder initialNodeSettings = Settings.builder();
Settings.Builder initialTransportClientSettings = Settings.builder();
if (addMockTransportService()) {
initialNodeSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
initialTransportClientSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
}
return new NodeConfigurationSource() {
@Override
@ -2087,24 +2035,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
public Collection<Class<? extends Plugin>> nodePlugins() {
return OpenSearchIntegTestCase.this.nodePlugins();
}
@Override
public Settings transportClientSettings() {
return Settings.builder()
.put(initialTransportClientSettings.build())
.put(OpenSearchIntegTestCase.this.transportClientSettings())
.build();
}
@Override
public Collection<Class<? extends Plugin>> transportClientPlugins() {
Collection<Class<? extends Plugin>> plugins = OpenSearchIntegTestCase.this.transportClientPlugins();
if (plugins.contains(getTestTransportPlugin()) == false) {
plugins = new ArrayList<>(plugins);
plugins.add(getTestTransportPlugin());
}
return Collections.unmodifiableCollection(plugins);
}
};
}
@ -2212,35 +2142,6 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase {
}
}
/**
* Returns the client ratio configured via
*/
private static double transportClientRatio() {
String property = System.getProperty(TESTS_CLIENT_RATIO);
if (property == null || property.isEmpty()) {
return Double.NaN;
}
return Double.parseDouble(property);
}
/**
* Returns the transport client ratio from the class level annotation or via
* {@link System#getProperty(String)} if available. If both are not available this will
* return a random ratio in the interval {@code [0..1]}.
*/
protected double getPerTestTransportClientRatio() {
final ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
double perTestRatio = -1;
if (annotation != null) {
perTestRatio = annotation.transportClientRatio();
}
if (perTestRatio == -1) {
return Double.isNaN(TRANSPORT_CLIENT_RATIO) ? randomDouble() : TRANSPORT_CLIENT_RATIO;
}
assert perTestRatio >= 0.0 && perTestRatio <= 1.0;
return perTestRatio;
}
/**
* Returns path to a random directory that can be used to create a temporary file system repo
*/

View File

@ -67,8 +67,6 @@ public abstract class TestCluster implements Closeable {
protected Random random;
protected double transportClientRatio = 0.0;
public TestCluster(long seed) {
this.seed = seed;
}
@ -80,10 +78,7 @@ public abstract class TestCluster implements Closeable {
/**
* This method should be executed before each test to reset the cluster to its initial state.
*/
public void beforeTest(Random random, double transportClientRatio) throws IOException, InterruptedException {
assert transportClientRatio >= 0.0 && transportClientRatio <= 1.0;
logger.debug("Reset test cluster with transport client ratio: [{}]", transportClientRatio);
this.transportClientRatio = transportClientRatio;
public void beforeTest(Random random) throws IOException, InterruptedException {
this.random = new Random(random.nextLong());
}

View File

@ -1,85 +0,0 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.transport;
import org.opensearch.client.transport.TransportClient;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.transport.nio.MockNioTransportPlugin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@SuppressWarnings({ "unchecked", "varargs" })
@Deprecated
public class MockTransportClient extends TransportClient {
private static final Settings DEFAULT_SETTINGS = Settings.builder()
.put("transport.type.default", MockNioTransportPlugin.MOCK_NIO_TRANSPORT_NAME)
.build();
public MockTransportClient(Settings settings, Class<? extends Plugin>... plugins) {
this(settings, Arrays.asList(plugins));
}
public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins) {
this(settings, plugins, null);
}
public MockTransportClient(Settings settings, Collection<Class<? extends Plugin>> plugins, HostFailureListener listener) {
super(settings, DEFAULT_SETTINGS, addMockTransportIfMissing(settings, plugins), listener);
}
private static Collection<Class<? extends Plugin>> addMockTransportIfMissing(
Settings settings,
Collection<Class<? extends Plugin>> plugins
) {
boolean settingExists = NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings);
String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings);
if (settingExists == false || MockNioTransportPlugin.MOCK_NIO_TRANSPORT_NAME.equals(transportType)) {
if (plugins.contains(MockNioTransportPlugin.class)) {
return plugins;
} else {
plugins = new ArrayList<>(plugins);
plugins.add(MockNioTransportPlugin.class);
return plugins;
}
}
return plugins;
}
public NamedWriteableRegistry getNamedWriteableRegistry() {
return namedWriteableRegistry;
}
}

View File

@ -184,7 +184,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
bootstrapMasterNodeIndex = maxNumDataNodes == 0 ? -1 : randomIntBetween(0, maxNumDataNodes - 1);
}
final int numClientNodes = randomIntBetween(0, 2);
String transportClient = getTestTransportType();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
@ -203,11 +202,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
@Override
public Settings transportClientSettings() {
return Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
}
};
String nodePrefix = "foobar";
@ -249,11 +243,11 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
try {
{
Random random = new Random(seed);
cluster0.beforeTest(random, random.nextDouble());
cluster0.beforeTest(random);
}
{
Random random = new Random(seed);
cluster1.beforeTest(random, random.nextDouble());
cluster1.beforeTest(random);
}
assertArrayEquals(cluster0.getNodeNames(), cluster1.getNodeNames());
Iterator<Client> iterator1 = cluster1.getClients().iterator();
@ -277,7 +271,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
final int maxNumDataNodes = 2;
final int numClientNodes = randomIntBetween(0, 2);
final String clusterName1 = "shared1";
String transportClient = getTestTransportType();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
@ -292,11 +285,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
@Override
public Settings transportClientSettings() {
return Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
}
};
String nodePrefix = "test";
Path baseDir = createTempDir();
@ -315,7 +303,7 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
Function.identity()
);
try {
cluster.beforeTest(random(), 0.0);
cluster.beforeTest(random());
final int originalMasterCount = cluster.numMasterNodes();
final Map<String, Path[]> shardNodePaths = new HashMap<>();
for (String name : cluster.getNodeNames()) {
@ -348,7 +336,7 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
Files.createDirectories(newTestMarker);
final String newNode3 = cluster.startNode(poorNodeDataPathSettings);
assertThat(getNodePaths(cluster, newNode3)[0], equalTo(dataPath));
cluster.beforeTest(random(), 0.0);
cluster.beforeTest(random());
assertFileNotExists(newTestMarker); // the cluster should be reset for a new test, cleaning up the extra path we made
assertFileNotExists(testMarker); // a new unknown node used this path, it should be cleaned
assertFileExists(stableTestMarker); // but leaving the structure of existing, reused nodes
@ -356,7 +344,7 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
assertThat("data paths for " + name + " changed", getNodePaths(cluster, name), equalTo(shardNodePaths.get(name)));
}
cluster.beforeTest(random(), 0.0);
cluster.beforeTest(random());
assertFileExists(stableTestMarker); // but leaving the structure of existing, reused nodes
for (String name : cluster.getNodeNames()) {
assertThat("data paths for " + name + " changed", getNodePaths(cluster, name), equalTo(shardNodePaths.get(name)));
@ -379,7 +367,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
final Path baseDir = createTempDir();
final int numNodes = 5;
String transportClient = getTestTransportType();
InternalTestCluster cluster = new InternalTestCluster(
randomLong(),
baseDir,
@ -404,18 +391,13 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
@Override
public Settings transportClientSettings() {
return Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
}
},
0,
"",
mockPlugins(),
Function.identity()
);
cluster.beforeTest(random(), 0.0);
cluster.beforeTest(random());
List<DiscoveryNodeRole> roles = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
final DiscoveryNodeRole role = i == numNodes - 1 && roles.contains(DiscoveryNodeRole.MASTER_ROLE) == false
@ -474,7 +456,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
}
public void testTwoNodeCluster() throws Exception {
String transportClient = getTestTransportType();
NodeConfigurationSource nodeConfigurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
@ -489,11 +470,6 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
@Override
public Settings transportClientSettings() {
return Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build();
}
};
String nodePrefix = "test";
Path baseDir = createTempDir();
@ -514,7 +490,7 @@ public class InternalTestClusterTests extends OpenSearchTestCase {
Function.identity()
);
try {
cluster.beforeTest(random(), 0.0);
cluster.beforeTest(random());
switch (randomInt(2)) {
case 0:
cluster.stopRandomDataNode();